You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/27 23:28:32 UTC

svn commit: r580166 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/

Author: stack
Date: Thu Sep 27 14:28:31 2007
New Revision: 580166

URL: http://svn.apache.org/viewvc?rev=580166&view=rev
Log:
HADOOP-1928 Have master pass the regionserver the filesystem to use

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/build.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/OOMERegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Sep 27 14:28:31 2007
@@ -81,6 +81,7 @@
     HADOOP-1884 Remove useless debugging log messages from hbase.mapred
     HADOOP-1856 Add Jar command to hbase shell using Hadoop RunJar util
                 (Edward Yoon via Stack)
+    HADOOP-1928 ] Have master pass the regionserver the filesystem to use
 
 
 Below are the list of changes before 2007-08-18

Modified: lucene/hadoop/trunk/src/contrib/hbase/build.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/build.xml?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/build.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/build.xml Thu Sep 27 14:28:31 2007
@@ -94,6 +94,7 @@
     <pathelement location="${conf.dir}"/>
     <pathelement location="${hadoop.root}/build"/>
     <pathelement location="${src.test}"/>
+    <pathelement location="${root}/conf"/>
     <path refid="classpath"/>
   </path>
   

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Thu Sep 27 14:28:31 2007
@@ -937,12 +937,11 @@
   /**
    * Checks to see if the file system is still accessible.
    * If not, sets closed
-   * 
    * @return false if file system is not available
    */
   protected boolean checkFileSystem() {
     if (fsOk) {
-      if (!FSUtils.isFileSystemAvailable(fs)) {
+      if (!FSUtils.isFileSystemAvailable(fs, closed)) {
         LOG.fatal("Shutting down HBase cluster: file system not available");
         closed.set(true);
         fsOk = false;
@@ -1127,9 +1126,9 @@
    * HMasterRegionInterface
    */
 
-  /** {@inheritDoc} */
   @SuppressWarnings("unused")
-  public void regionServerStartup(HServerInfo serverInfo) throws IOException {
+  public MapWritable regionServerStartup(HServerInfo serverInfo)
+  throws IOException {
     String s = serverInfo.getServerAddress().toString().trim();
     HServerInfo storedInfo = null;
     LOG.info("received start message from: " + s);
@@ -1137,11 +1136,9 @@
     // If we get the startup message but there's an old server by that
     // name, then we can timeout the old one right away and register
     // the new one.
-
     synchronized (serversToServerInfo) {
       storedInfo = serversToServerInfo.remove(s);
       HServerLoad load = serversToLoad.remove(s);
-    
       if (load != null) {
         Set<String> servers = loadToServers.get(load);
         if (servers != null) {
@@ -1160,7 +1157,6 @@
     }
 
     // Either way, record the new server
-
     synchronized (serversToServerInfo) {
       HServerLoad load = new HServerLoad();
       serverInfo.setLoad(load);
@@ -1178,6 +1174,22 @@
       long serverLabel = getServerLabel(s);
       serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
     }
+    
+    return createConfigurationSubset();
+  }
+  
+  /**
+   * @return Subset of configuration to pass initializing regionservers: e.g.
+   * the filesystem to use and root directory to use.
+   */
+  protected MapWritable createConfigurationSubset() {
+    MapWritable mw = addConfig(new MapWritable(), HConstants.HBASE_DIR);
+    return addConfig(mw, "fs.default.name");
+  }
+
+  private MapWritable addConfig(final MapWritable mw, final String key) {
+    mw.put(new Text(key), new Text(this.conf.get(key)));
+    return mw;
   }
 
   private long getServerLabel(final String s) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java Thu Sep 27 14:28:31 2007
@@ -20,6 +20,8 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
+
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.ipc.VersionedProtocol;
 
 /**
@@ -34,8 +36,10 @@
    * Called when a region server first starts
    * @param info
    * @throws IOException
+   * @return Configuration for the regionserver to use: e.g. filesystem,
+   * hbase rootdir, etc.
    */
-  public void regionServerStartup(HServerInfo info) throws IOException;
+  public MapWritable regionServerStartup(HServerInfo info) throws IOException;
   
   /**
    * Called to renew lease, tell master what the region server is doing and to

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Thu Sep 27 14:28:31 2007
@@ -23,6 +23,7 @@
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -55,6 +56,7 @@
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
@@ -67,22 +69,12 @@
 public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   static final Log LOG = LogFactory.getLog(HRegionServer.class);
   
-  /** {@inheritDoc} */
-  public long getProtocolVersion(final String protocol, 
-      @SuppressWarnings("unused") final long clientVersion)
-  throws IOException {  
-    if (protocol.equals(HRegionInterface.class.getName())) {
-      return HRegionInterface.versionID;
-    }
-    throw new IOException("Unknown protocol to name node: " + protocol);
-  }
-  
   // Set when a report to the master comes back with a message asking us to
   // shutdown.  Also set by call to stop when debugging or running unit tests
   // of HRegionServer in isolation. We use AtomicBoolean rather than
   // plain boolean so we can pass a reference to Chore threads.  Otherwise,
   // Chore threads need to know about the hosting class.
-  protected AtomicBoolean stopRequested = new AtomicBoolean(false);
+  protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
   
   // Go down hard.  Used if file system becomes unavailable and also in
   // debugging and unit tests.
@@ -91,38 +83,35 @@
   // If false, the file system has become unavailable
   protected volatile boolean fsOk;
   
-  final Path rootDir;
   protected final HServerInfo serverInfo;
   protected final Configuration conf;
-  private final Random rand;
+  private final Random rand = new Random();
   
   // region name -> HRegion
-  protected final SortedMap<Text, HRegion> onlineRegions;
+  protected final SortedMap<Text, HRegion> onlineRegions =
+    Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
   protected final Map<Text, HRegion> retiringRegions =
     new HashMap<Text, HRegion>();
  
   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final Vector<HMsg> outboundMsgs;
+  private final Vector<HMsg> outboundMsgs = new Vector<HMsg>();
 
   int numRetries;
   protected final int threadWakeFrequency;
   private final int msgInterval;
-  
-  // File paths
-  private FileSystem fs;
 
   // Remote HMaster
-  private HMasterRegionInterface hbaseMaster;
+  private final HMasterRegionInterface hbaseMaster;
 
   // Server to handle client requests.  Default access so can be accessed by
   // unit tests.
-  Server server;
+  final Server server;
   
   // Leases
-  private Leases leases;
+  private final Leases leases;
   
   // Request counter
-  private AtomicInteger requestCount;
+  private final AtomicInteger requestCount = new AtomicInteger();
   
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
@@ -134,7 +123,7 @@
   // interrupted.
   protected final Integer splitOrCompactLock = new Integer(0);
   
-  /**
+  /*
    * Runs periodically to determine if regions need to be compacted or split
    */
   class SplitOrCompactChecker extends Chore
@@ -150,7 +139,6 @@
         30 * 1000), stop);
     }
 
-    /** {@inheritDoc} */
     public void closing(final Text regionName) {
       lock.writeLock().lock();
       try {
@@ -166,7 +154,6 @@
       }
     }
     
-    /** {@inheritDoc} */
     public void closed(final Text regionName) {
       lock.writeLock().lock();
       try {
@@ -290,7 +277,6 @@
       super(period, stop);
     }
     
-    /** {@inheritDoc} */
     @Override
     protected void chore() {
       synchronized(cacheFlusherLock) {
@@ -326,8 +312,9 @@
     }
   }
   
-  // HLog and HLog roller.
-  protected final HLog log;
+  // HLog and HLog roller.  log is protected rather than private to avoid
+  // eclipse warning when accessed by inner classes
+  protected HLog log;
   private final Thread logRollerThread;
   protected final Integer logRollerLock = new Integer(0);
   
@@ -375,32 +362,21 @@
    * @throws IOException
    */
   public HRegionServer(Configuration conf) throws IOException {
-    this(new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)),
-        new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
-          DEFAULT_REGIONSERVER_ADDRESS)),
-        conf);
+    this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+        DEFAULT_REGIONSERVER_ADDRESS)), conf);
   }
   
   /**
    * Starts a HRegionServer at the specified location
-   * @param rootDir
    * @param address
    * @param conf
    * @throws IOException
    */
-  public HRegionServer(Path rootDir, HServerAddress address,
-      Configuration conf)
+  public HRegionServer(HServerAddress address, Configuration conf)
   throws IOException {  
     this.abortRequested = false;
     this.fsOk = true;
-    this.rootDir = rootDir;
     this.conf = conf;
-    this.rand = new Random();
-    this.onlineRegions =
-      Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
-    
-    this.outboundMsgs = new Vector<HMsg>();
-    this.requestCount = new AtomicInteger();
 
     // Config'ed params
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
@@ -416,112 +392,26 @@
       new SplitOrCompactChecker(this.stopRequested);
     
     // Task thread to process requests from Master
-    this.toDo = new LinkedBlockingQueue<ToDoEntry>();
     this.worker = new Worker();
     this.workerThread = new Thread(worker);
     this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
-
-    try {
-      // Server to handle client requests
-      this.server = RPC.getServer(this, address.getBindAddress(), 
-        address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
-        false, conf);
-
-      // Use interface to get the 'real' IP for this host.
-      // 'serverInfo' is sent to master.  Should have the real IP of this host
-      // rather than 'localhost' or 0.0.0.0 or 127.0.0.1 in it.
-      String realIP = DNS.getDefaultIP(
-        conf.get("dfs.datanode.dns.interface","default"));
-      this.serverInfo = new HServerInfo(new HServerAddress(
-        new InetSocketAddress(realIP, server.getListenerAddress().getPort())),
-        this.rand.nextLong());
-      Path logdir = new Path(rootDir, "log" + "_" + realIP + "_" +
-        this.serverInfo.getServerAddress().getPort());
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Log dir " + logdir);
-      }
-      
-      // Logging
-      this.fs = FileSystem.get(conf);
-      if(fs.exists(logdir)) {
-        throw new RegionServerRunningException("region server already " +
-          "running at " + this.serverInfo.getServerAddress().toString() +
-          " because logdir " + logdir.toString() + " exists");
-      }
-      
-      this.log = new HLog(fs, logdir, conf);
-      this.logRollerThread =
-        new LogRoller(this.threadWakeFrequency, stopRequested);
-
-      // Remote HMaster
-      this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
-          HMasterRegionInterface.class, HMasterRegionInterface.versionID,
-          new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
-          conf);
-    } catch (IOException e) {
-      this.stopRequested.set(true);
-      throw RemoteExceptionHandler.checkIOException(e);
-    }
-  }
-
-  /** @return the HLog */
-  HLog getLog() {
-    return log;
-  }
-
-  /**
-   * Sets a flag that will cause all the HRegionServer threads to shut down
-   * in an orderly fashion.  Used by unit tests and called by {@link Flusher}
-   * if it judges server needs to be restarted.
-   */
-  synchronized void stop() {
-    this.stopRequested.set(true);
-    notifyAll();                        // Wakes run() if it is sleeping
-  }
-  
-  /**
-   * Cause the server to exit without closing the regions it is serving, the
-   * log it is using and without notifying the master.
-   * Used unit testing and on catastrophic events such as HDFS is yanked out
-   * from under hbase or we OOME.
-   */
-  synchronized void abort() {
-    this.abortRequested = true;
-    stop();
-  }
-
-  /** 
-   * Wait on all threads to finish.
-   * Presumption is that all closes and stops have already been called.
-   */
-  void join() {
-    try {
-      this.workerThread.join();
-    } catch(InterruptedException iex) {
-      // continue
-    }
-    try {
-      this.logRollerThread.join();
-    } catch(InterruptedException iex) {
-      // continue
-    }
-    try {
-      this.cacheFlusherThread.join();
-    } catch(InterruptedException iex) {
-      // continue
-    }
-    try {
-      this.splitOrCompactCheckerThread.join();
-    } catch(InterruptedException iex) {
-      // continue
-    }
-    try {
-      this.server.join();
-    } catch(InterruptedException iex) {
-      // continue
-    }
-    LOG.info("HRegionServer stopped at: " +
-      serverInfo.getServerAddress().toString());
+    this.logRollerThread =
+      new LogRoller(this.threadWakeFrequency, stopRequested);
+    // Server to handle client requests
+    this.server = RPC.getServer(this, address.getBindAddress(), 
+      address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+      false, conf);
+    this.serverInfo = new HServerInfo(new HServerAddress(
+      new InetSocketAddress(getThisIP(),
+      this.server.getListenerAddress().getPort())), this.rand.nextLong());
+     this.leases = new Leases(
+       conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
+       this.threadWakeFrequency);
+    // Remote HMaster
+    this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
+        HMasterRegionInterface.class, HMasterRegionInterface.versionID,
+        new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
+        conf);
   }
 
   /**
@@ -530,21 +420,13 @@
    * load/unload instructions.
    */
   public void run() {
-    startAllServices();
-    
     // Set below if HMaster asked us stop.
     boolean masterRequestedStop = false;
     
     try {
+      init(reportForDuty());
       while(!stopRequested.get()) {
         long lastMsg = 0;
-        try {
-          reportForDuty();
-        } catch(IOException e) {
-          this.sleeper.sleep(lastMsg);
-          continue;
-        }
-
         // Now ask master what it wants us to do and tell it what we have done
         for (int tries = 0; !stopRequested.get();) {
           if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
@@ -630,7 +512,7 @@
       LOG.fatal("Unhandled exception. Aborting...", t);
       abort();
     }
-    leases.closeAfterLeasesExpire();
+    this.leases.closeAfterLeasesExpire();
     this.worker.stop();
     this.server.stop();
     
@@ -691,10 +573,52 @@
         serverInfo.getServerAddress().toString());
     }
 
-    join(); 
+    join();
     LOG.info(Thread.currentThread().getName() + " exiting");
   }
-
+  
+  /*
+   * Run init. Sets up hlog and starts up all server threads.
+   * @param c Extra configuration.
+   */
+  private void init(final MapWritable c) {
+    try {
+      for (Map.Entry<Writable, Writable> e: c.entrySet()) {
+        String key = e.getKey().toString();
+        String value = e.getValue().toString();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Config from master: " + key + "=" + value);
+        }
+        this.conf.set(key, value);
+      }
+      this.log = setupHLog();
+      startServiceThreads();
+    } catch (IOException e) {
+      this.stopRequested.set(true);
+      LOG.fatal("Failed init",
+        RemoteExceptionHandler.checkIOException(e));
+    }
+  }
+  
+  private HLog setupHLog()
+  throws RegionServerRunningException, IOException {
+    String rootDir = this.conf.get(HConstants.HBASE_DIR);
+    LOG.info("Root dir: " + rootDir);
+    Path logdir = new Path(new Path(rootDir),
+      "log" + "_" + getThisIP() + "_" +
+      this.serverInfo.getServerAddress().getPort());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Log dir " + logdir);
+    }
+    FileSystem fs = FileSystem.get(this.conf);
+    if (fs.exists(logdir)) {
+      throw new RegionServerRunningException("region server already " +
+        "running at " + this.serverInfo.getServerAddress().toString() +
+        " because logdir " + logdir.toString() + " exists");
+    }
+    return new HLog(fs, logdir, conf);
+  }
+  
   /*
    * Start Chore Threads, Server, Worker and lease checker threads. Install an
    * UncaughtExceptionHandler that calls abort of RegionServer if we get
@@ -707,7 +631,7 @@
    * Chore, it keeps its own internal stop mechanism so needs to be stopped
    * by this hosting server.  Worker logs the exception and exits.
    */
-  private void startAllServices() {
+  private void startServiceThreads() throws IOException {
     String n = Thread.currentThread().getName();
     UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
       public void uncaughtException(Thread t, Throwable e) {
@@ -728,41 +652,105 @@
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
     // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
     // an unhandled exception, it will just exit.
-    this.leases = new Leases(
-      conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
-      this.threadWakeFrequency);
     this.leases.setName(n + ".leaseChecker");
     this.leases.start();
     // Start Server.  This service is like leases in that it internally runs
     // a thread.
-    try {
-      this.server.start();
-      LOG.info("HRegionServer started at: " +
+    this.server.start();
+    LOG.info("HRegionServer started at: " +
         serverInfo.getServerAddress().toString());
-    } catch(IOException e) {
-      this.stopRequested.set(true);
-      LOG.fatal("Failed start Server",
-        RemoteExceptionHandler.checkIOException(e));
+  }
+
+  /** @return the HLog */
+  HLog getLog() {
+    return this.log;
+  }
+
+  /*
+   * Use interface to get the 'real' IP for this host. 'serverInfo' is sent to
+   * master.  Should have the real IP of this host rather than 'localhost' or
+   * 0.0.0.0 or 127.0.0.1 in it.
+   * @return This servers' IP.
+   */
+  private String getThisIP() throws UnknownHostException {
+    return DNS.getDefaultIP(conf.get("dfs.datanode.dns.interface","default"));
+  }
+
+  /**
+   * Sets a flag that will cause all the HRegionServer threads to shut down
+   * in an orderly fashion.  Used by unit tests and called by {@link Flusher}
+   * if it judges server needs to be restarted.
+   */
+  synchronized void stop() {
+    this.stopRequested.set(true);
+    notifyAll();                        // Wakes run() if it is sleeping
+  }
+  
+  /**
+   * Cause the server to exit without closing the regions it is serving, the
+   * log it is using and without notifying the master.
+   * Used unit testing and on catastrophic events such as HDFS is yanked out
+   * from under hbase or we OOME.
+   */
+  synchronized void abort() {
+    this.abortRequested = true;
+    stop();
+  }
+
+  /** 
+   * Wait on all threads to finish.
+   * Presumption is that all closes and stops have already been called.
+   */
+  void join() {
+    join(this.workerThread);
+    join(this.logRollerThread);
+    join(this.cacheFlusherThread);
+    join(this.splitOrCompactCheckerThread);
+    try {
+      this.server.join();
+    } catch (InterruptedException e) {
+      // No means of asking server if its done... .so just assume it is even
+      // if an interrupt.
+    }
+  }
+
+  private void join(final Thread t) {
+    while (t.isAlive()) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        // continue
+      }
     }
   }
   
   /*
    * Let the master know we're here
-   * @throws IOException
+   * Run initialization using parameters passed us by the master.
    */
-  private void reportForDuty() throws IOException {
+  private MapWritable reportForDuty() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Telling master we are up");
     }
-    this.requestCount.set(0);
-    this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
-    this.hbaseMaster.regionServerStartup(serverInfo);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Done telling master we are up");
+    MapWritable result = null;
+    while(!stopRequested.get()) {
+      long lastMsg = 0;
+      try {
+        this.requestCount.set(0);
+        this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
+        result = this.hbaseMaster.regionServerStartup(serverInfo);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Done telling master we are up");
+        }
+        break;
+      } catch(IOException e) {
+        this.sleeper.sleep(lastMsg);
+        continue;
+      }
     }
+    return result;
   }
 
-
   /** Add to the outbound message buffer */
   private void reportOpen(HRegion region) {
     synchronized(outboundMsgs) {
@@ -808,7 +796,7 @@
       this.msg = msg;
     }
   }
-  BlockingQueue<ToDoEntry> toDo;
+  BlockingQueue<ToDoEntry> toDo = new LinkedBlockingQueue<ToDoEntry>();
   private Worker worker;
   private Thread workerThread;
   
@@ -886,7 +874,8 @@
   void openRegion(HRegionInfo regionInfo) throws IOException {
     HRegion region = onlineRegions.get(regionInfo.regionName);
     if(region == null) {
-      region = new HRegion(rootDir, log, fs, conf, regionInfo, null);
+      region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
+        this.log, FileSystem.get(conf), conf, regionInfo, null);
       this.lock.writeLock().lock();
       try {
         this.log.setSequenceNumber(region.getMaxSequenceId());
@@ -1275,7 +1264,13 @@
    */
   protected boolean checkFileSystem() {
     if (this.fsOk) {
-      if (!FSUtils.isFileSystemAvailable(fs)) {
+      FileSystem fs = null;
+      try {
+        fs = FileSystem.get(this.conf);
+      } catch (IOException e) {
+        LOG.error("Failed get of filesystem", e);
+      }
+      if (fs != null && !FSUtils.isFileSystemAvailable(fs, stopRequested)) {
         LOG.fatal("Shutting down HRegionServer: file system not available");
         this.abortRequested = true;
         this.stopRequested.set(true);
@@ -1306,6 +1301,15 @@
       }
     }
     return regionsToCheck;
+  }
+
+  public long getProtocolVersion(final String protocol, 
+      @SuppressWarnings("unused") final long clientVersion)
+  throws IOException {  
+    if (protocol.equals(HRegionInterface.class.getName())) {
+      return HRegionInterface.versionID;
+    }
+    throw new IOException("Unknown protocol to name node: " + protocol);
   }
 
   //

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java Thu Sep 27 14:28:31 2007
@@ -20,11 +20,13 @@
 package org.apache.hadoop.hbase.util;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.dfs.DistributedFileSystem;
 
 /**
@@ -36,23 +38,28 @@
   /**
    * Not instantiable
    */
-  private FSUtils() {}
+  private FSUtils() {super();}
   
   /**
    * Checks to see if the specified file system is available
    * 
    * @param fs
+   * @param closed Optional flag.  If non-null and set, will abort test of
+   * filesytem.  Presumption is a flag shared by multiple threads.  Another
+   * may have already determined the filesystem -- or something else -- bad.
    * @return true if the specified file system is available.
    */
-  public static boolean isFileSystemAvailable(FileSystem fs) {
+  public static boolean isFileSystemAvailable(final FileSystem fs,
+      final AtomicBoolean closed) {
     if (!(fs instanceof DistributedFileSystem)) {
       return true;
     }
     boolean available = false;
     DistributedFileSystem dfs = (DistributedFileSystem) fs;
     int maxTries = dfs.getConf().getInt("hbase.client.retries.number", 3);
-    Path root = new Path(dfs.getConf().get("hbase.dir", "/"));
-    for (int i = 0; i < maxTries; i++) {
+    Path root =
+      fs.makeQualified(new Path(dfs.getConf().get(HConstants.HBASE_DIR, "/")));
+    for (int i = 0; i < maxTries && (closed == null || !closed.get()); i++) {
       IOException ex = null;
       try {
         if (dfs.exists(root)) {
@@ -62,12 +69,10 @@
       } catch (IOException e) {
         ex = e;
       }
-      String exception = "";
-      if (ex != null) {
-        exception = ": " + ex.getMessage();
-      }
-      LOG.info("Failed exists test on " + root + " (Attempt " + i + ")" +
-          exception);
+      String exception = (ex == null)? "": ": " + ex.getMessage();
+      LOG.info("Failed exists test on " + root + " by thread " +
+        Thread.currentThread().getName() + " (Attempt " + i + " of " +
+        maxTries  +"): " + exception);
     }
     try {
       if (!available) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Thu Sep 27 14:28:31 2007
@@ -63,10 +63,10 @@
       HConstants.DEFAULT_MAX_FILE_SIZE) <= 1024 * 1024);
 
     final int retries = 10; 
-    Path d = cluster.regionThreads.get(0).getRegionServer().rootDir;
     FileSystem fs = (cluster.getDFSCluster() == null) ?
       localFs : cluster.getDFSCluster().getFileSystem();
     assertNotNull(fs);
+    Path d = fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR)));
 
     // Get connection on the meta table and get count of rows.
     

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/OOMERegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/OOMERegionServer.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/OOMERegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/OOMERegionServer.java Thu Sep 27 14:28:31 2007
@@ -24,7 +24,6 @@
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.io.Text;
 
@@ -42,9 +41,9 @@
     super(conf);
   }
 
-  public OOMERegionServer(Path rootDir, HServerAddress address,
-      Configuration conf) throws IOException {
-    super(rootDir, address, conf);
+  public OOMERegionServer(HServerAddress address, Configuration conf)
+  throws IOException {
+    super(address, conf);
   }
   
   @Override

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java?rev=580166&r1=580165&r2=580166&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java Thu Sep 27 14:28:31 2007
@@ -33,8 +33,6 @@
   /** constructor */
   public TestDFSAbort() {
     super();
-//    conf.setInt("ipc.client.timeout", 5000);            // reduce ipc client timeout
-//    conf.setInt("ipc.client.connect.max.retries", 5);   // and number of retries
     Logger.getRootLogger().setLevel(Level.WARN);
     Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
   }
@@ -66,4 +64,4 @@
   public static void main(@SuppressWarnings("unused") String[] args) {
     TestRunner.run(new TestSuite(TestDFSAbort.class));
   }
-}
+}
\ No newline at end of file