You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2009/05/27 13:07:25 UTC

svn commit: r779111 - in /hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server: datanode/ namenode/

Author: stevel
Date: Wed May 27 11:07:24 2009
New Revision: 779111

URL: http://svn.apache.org/viewvc?rev=779111&view=rev
Log:
HADOOP-3628 bring the filesystem under the lifecycle, including BackupNode

Modified:
    hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
    hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=779111&r1=779110&r2=779111&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed May 27 11:07:24 2009
@@ -45,7 +45,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -98,6 +97,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.Service;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -130,7 +130,7 @@
  * information to clients or other DataNodes that might be interested.
  *
  **********************************************************/
-public class DataNode extends Configured 
+public class DataNode extends Service
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants, Runnable {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
@@ -204,7 +204,8 @@
   
   /** Activated plug-ins. */
   private List<ServicePlugin> plugins;
-  
+  /** data directories */
+  private AbstractList<File> dataDirs;
   private static final Random R = new Random();
   
   // For InterDataNodeProtocol
@@ -220,20 +221,55 @@
 
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
-   * 'dataDirs' is where the blocks are stored.
+   * 'dataDirs' is where the blocks are stored. This constructor does not start
+   * the node, merely initializes it
+   *
+   * @param conf     configuration to use
+   * @param dataDirs list of directories that may be used for data
+   * @throws IOException for historical reasons
    */
-  DataNode(Configuration conf, 
+  DataNode(Configuration conf,
            AbstractList<File> dataDirs) throws IOException {
     super(conf);
     DataNode.setDataNode(this);
-    try {
-      startDataNode(conf, dataDirs);
-    } catch (IOException ie) {
-      shutdown();
-      throw ie;
-    }
+    this.dataDirs = dataDirs;
+  }
+
+  /////////////////////////////////////////////////////                     
+/////////////////////////////////////////////////////                                           
+// Lifecycle                                                                                    
+/////////////////////////////////////////////////////                                           
+
+  /**
+   * Start any work (in separate threads)
+   *
+   * @throws IOException for any startup failure
+   */
+  @Override
+  public void innerStart() throws IOException {
+    startDataNode(getConf(), dataDirs);
   }
     
+  /**
+   * {@inheritDoc}.
+   *
+   * This implementation checks for the IPC server running and the
+   * DataNode being registered to a namenode.
+   *
+   * @param status the initial status
+   * @throws IOException       for any ping failure
+   * @throws LivenessException if the IPC server is not defined 
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (ipcServer == null) {
+      status.addThrowable(new LivenessException("No IPC Server running"));
+    }
+    if (dnRegistration == null) {
+      status.addThrowable(
+              new LivenessException("Not registered to a namenode"));
+    }
+  }
   
   /**
    * This method starts the data node with the specified conf.
@@ -362,6 +398,9 @@
     int tmpInfoPort = infoSocAddr.getPort();
     this.infoServer = new HttpServer("datanode", infoHost, tmpInfoPort,
         tmpInfoPort == 0, conf);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
+    }
     if (conf.getBoolean("dfs.https.enable", false)) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
@@ -369,6 +408,9 @@
       Configuration sslConf = new Configuration(false);
       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
           "ssl-server.xml"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
+      }
       this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
     }
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
@@ -435,6 +477,10 @@
         } catch (InterruptedException ie) {}
       }
     }
+    if(!shouldRun) {
+      throw new IOException("Datanode shut down during handshake with NameNode "
+               + getNameNodeAddr());
+    }
     String errorMsg = null;
     // verify build version
     if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
@@ -539,10 +585,14 @@
    * @see FSNamesystem#registerDatanode(DatanodeRegistration)
    * @throws IOException
    */
-  private void register() throws IOException {
+  protected void register() throws IOException {
     if (dnRegistration.getStorageID().equals("")) {
       setNewStorageID(dnRegistration);
     }
+    //if we are LIVE, move into the STARTED state, as registration implies that
+    //the node is no longer LIVE
+    enterState(ServiceState.LIVE, ServiceState.STARTED);
+    //spin until the server is up.
     while(shouldRun) {
       try {
         // reset name to machineName. Mainly for web interface.
@@ -593,39 +643,55 @@
       dnRegistration.exportedKeys = ExportedAccessKeys.DUMMY_KEYS;
     }
 
+    //at this point the DataNode now considers itself live.  
+    enterLiveState();
     // random short delay - helps scatter the BR from all DNs
     scheduleBlockReport(initialBlockReportDelay);
   }
 
+
+  /**
+   * Shut down this instance of the datanode. Returns only after shutdown is
+   * complete.
+   */
+  public void shutdown() {
+    closeQuietly();
+  }
+
   /**
    * Shut down this instance of the datanode.
    * Returns only after shutdown is complete.
    * This method can only be called by the offerService thread.
    * Otherwise, deadlock might occur.
    */
-  public void shutdown() {
-    if (plugins != null) {
-      for (ServicePlugin p : plugins) {
+  @Override
+  protected void innerClose() throws IOException {
+    synchronized (this) {
+      //disable the should run flag first, so that everything out there starts
+      //to shut down
+      shouldRun = false;
+      if (plugins != null) {
+        for (ServicePlugin p : plugins) {
+          try {
+            p.stop();
+            LOG.info("Stopped plug-in " + p);
+          } catch (Throwable t) {
+            LOG.warn("ServicePlugin " + p + " could not be stopped", t);
+          }
+        }
+      }
+
+      if (infoServer != null) {
         try {
-          p.stop();
-          LOG.info("Stopped plug-in " + p);
-        } catch (Throwable t) {
-          LOG.warn("ServicePlugin " + p + " could not be stopped", t);
+          infoServer.stop();
+        } catch (Exception e) {
+          LOG.warn("Exception shutting down DataNode", e);
         }
       }
-    }
-    
-    if (infoServer != null) {
-      try {
-        infoServer.stop();
-      } catch (Exception e) {
-        LOG.warn("Exception shutting down DataNode", e);
+      if (ipcServer != null) {
+        ipcServer.stop();
       }
     }
-    if (ipcServer != null) {
-      ipcServer.stop();
-    }
-    this.shouldRun = false;
     if (dataXceiverServer != null) {
       ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
       this.dataXceiverServer.interrupt();
@@ -671,6 +737,8 @@
       try {
         this.storage.unlockAll();
       } catch (IOException ie) {
+        LOG.warn("Ignoring exception when unlocking storage: "+ie,
+                ie);
       }
     }
     if (dataNodeThread != null) {
@@ -1271,7 +1339,9 @@
         startDistributedUpgradeIfNeeded();
         offerService();
       } catch (Exception ex) {
-        LOG.error("Exception: " + StringUtils.stringifyException(ex));
+        LOG.error("Exception while in state " + getServiceState()
+                + " and shouldRun=" + shouldRun + ": " + ex,
+                ex);
         if (shouldRun) {
           try {
             Thread.sleep(5000);
@@ -1351,33 +1421,51 @@
    * @param conf Configuration instance to use.
    * @return DataNode instance for given list of data dirs and conf, or null if
    * no directory from this directory list can be created.
-   * @throws IOException
+   * @throws IOException if problems occur when starting the data node
    */
   public static DataNode makeInstance(String[] dataDirs, Configuration conf)
     throws IOException {
     ArrayList<File> dirs = new ArrayList<File>();
-    for (int i = 0; i < dataDirs.length; i++) {
-      File data = new File(dataDirs[i]);
+    StringBuffer invalid = new StringBuffer();
+    for (String dataDir : dataDirs) {
+      File data = new File(dataDir);
       try {
         DiskChecker.checkDir(data);
         dirs.add(data);
       } catch(DiskErrorException e) {
-        LOG.warn("Invalid directory in dfs.data.dir: " + e.getMessage());
+        LOG.warn("Invalid directory in dfs.data.dir: " + e, e);
+        invalid.append(dataDir);
+        invalid.append(" ");
       }
     }
-    if (dirs.size() > 0) 
-      return new DataNode(conf, dirs);
-    LOG.error("All directories in dfs.data.dir are invalid.");
-    return null;
+    if (dirs.size() > 0) {
+      DataNode dataNode = new DataNode(conf, dirs);
+      Service.startService(dataNode);
+      return dataNode;
+    } else {
+      LOG.error("All directories in dfs.data.dir are invalid: " + invalid);
+      return null;
+    }
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @return the name of this service
+   */
+  @Override
+  public String getServiceName() {
+    return "DataNode";
+  }
+  
   @Override
   public String toString() {
-    return "DataNode{" +
+    return getServiceName() + " {" +
       "data=" + data +
       ", localName='" + dnRegistration.getName() + "'" +
       ", storageID='" + dnRegistration.getStorageID() + "'" +
       ", xmitsInProgress=" + xmitsInProgress.get() +
+      ", state=" + getServiceState() +
       "}";
   }
   

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=779111&r1=779110&r2=779111&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed May 27 11:07:24 2009
@@ -179,6 +179,9 @@
       }
 
       File blockFiles[] = dir.listFiles();
+      if (blockFiles == null) {
+        throw new IllegalStateException("Not a valid directory: " + dir);
+      }
       for (int i = 0; i < blockFiles.length; i++) {
         if (Block.isBlockFilename(blockFiles[i])) {
           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java?rev=779111&r1=779110&r2=779111&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BackupNode.java Wed May 27 11:07:24 2009
@@ -130,8 +130,17 @@
     runCheckpointDaemon(conf);
   }
 
+  /**
+   * {@inheritDoc} 
+   * <p/> 
+   * When shutting down, this service shuts down the checkpoint manager.
+   * If registered to a namenode, it reports that it is shutting down
+   * via {@link NameNode#errorReport(NamenodeRegistration, int, String)} 
+   *
+   * @throws IOException for any IO problem
+   */
   @Override // NameNode
-  public void stop() {
+  public void innerClose() throws IOException {
     if(checkpointManager != null) checkpointManager.shouldRun = false;
     if(cpDaemon != null) cpDaemon.interrupt();
     if(namenode != null && getRegistration() != null) {
@@ -143,7 +152,17 @@
       }
     }
     RPC.stopProxy(namenode); // stop the RPC threads
-    super.stop();
+    super.innerClose();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return "BackupNode"
+   */
+  @Override
+  public String getServiceName() {
+    return "BackupNode";
   }
 
   /////////////////////////////////////////////////////

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=779111&r1=779110&r2=779111&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed May 27 11:07:24 2009
@@ -106,7 +106,7 @@
       }
   };
 
-  private static final void logAuditEvent(UserGroupInformation ugi,
+  private static void logAuditEvent(UserGroupInformation ugi,
       InetAddress addr, String cmd, String src, String dst,
       FileStatus stat) {
     final Formatter fmt = auditFormatter.get();
@@ -433,6 +433,51 @@
   }
 
   /**
+   * Test for a thread ref not being null or pointing to a dead thread
+   * @param thread the thread to check
+   * @return true if the thread is considered dead
+   */
+  private boolean isDead(Thread thread) {
+      return thread == null || !thread.isAlive();
+  }
+
+  /**
+   * Perform a cursory health check of the namesystem, particulary that it has
+   * not been closed and that all threads are running.
+   * @throws IOException for any health check
+   */
+  void ping() throws IOException {
+    if (!fsRunning) {
+      throw new IOException("Namesystem is not running");
+    }
+    boolean bad = false;
+    StringBuilder sb = new StringBuilder();
+    if (isDead(hbthread)) {
+      bad = true;
+      sb.append("[Heartbeat thread is dead]");
+    }
+    if (isDead(replthread)) {
+      bad = true;
+      sb.append("[Replication thread is dead]");
+    }
+    // this thread's liveness is only relevant in safe mode.
+    if (safeMode!=null && isDead(smmthread)) {
+      bad = true;
+      sb.append("[SafeModeMonitor thread is dead while the name system is in safe mode]");
+    }
+    if (isDead(dnthread)) {
+        bad = true;
+        sb.append("[DecommissionedMonitor thread is dead]");
+    }
+    if (isDead(lmthread)) {
+      bad = true;
+      sb.append("[Lease monitor thread is dead]");
+    }
+    if (bad) {
+      throw new IOException(sb.toString());
+    }
+  }
+  /**
    * Close down this file system manager.
    * Causes heartbeat and lease daemons to stop; waits briefly for
    * them to finish, but a short timeout returns control back to caller.
@@ -454,7 +499,10 @@
           lmthread.interrupt();
           lmthread.join(3000);
         }
-        dir.close();
+        if(dir != null) {
+         dir.close();
+         dir =  null;
+        }
       } catch (InterruptedException ie) {
       } catch (IOException ie) {
         LOG.error("Error closing FSDirectory", ie);
@@ -1117,7 +1165,10 @@
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +
-                            blockManager.minReplication);
+                            blockManager.minReplication
+                            + ". ( there are currently " 
+                            + heartbeats.size()
+                            +" live data nodes in the cluster)");
     }
 
     // Allocate a new block and record it in the INode. 

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=779111&r1=779110&r2=779111&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed May 27 11:07:24 2009
@@ -78,6 +78,7 @@
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.Service;
 import org.apache.hadoop.util.ServicePlugin;
 import org.apache.hadoop.util.StringUtils;
 
@@ -115,7 +116,7 @@
  * secondary namenodes or rebalancing processes to get partial namenode's
  * state, for example partial blocksMap etc.
  **********************************************************/
-public class NameNode implements ClientProtocol, DatanodeProtocol,
+public class NameNode extends Service implements ClientProtocol, DatanodeProtocol,
                                  NamenodeProtocol, FSConstants,
                                  RefreshAuthorizationPolicyProtocol {
   static{
@@ -369,7 +370,7 @@
   }
 
   /**
-   * Start NameNode.
+   * Create a NameNode.
    * <p>
    * The name-node can be started with one of the following startup options:
    * <ul> 
@@ -400,12 +401,90 @@
   }
 
   protected NameNode(Configuration conf, NamenodeRole role) throws IOException {
+    super(conf);
     this.role = role;
-    try {
-      initialize(conf);
-    } catch (IOException e) {
-      this.stop();
-      throw e;
+  }
+
+  /**
+   * The toString operator returns the super class name/id, and the state. This
+   * gives all services a slightly useful message in a debugger or test report
+   *
+   * @return a string representation of the object.
+   */
+  @Override
+  public String toString() {
+    return super.toString() 
+            + (httpAddress != null ? (" at " + httpAddress + " , ") : "")
+            + (server != null ? (" IPC " + server.getListenerAddress()) : "");
+  }
+
+  /////////////////////////////////////////////////////
+  // Service Lifecycle and other methods
+  /////////////////////////////////////////////////////
+  
+  /**
+   * {@inheritDoc}
+   *
+   * @return "NameNode"
+   */
+  @Override
+  public String getServiceName() {
+    return "NameNode";
+  }
+  
+  /**
+   * This method does all the startup. It is invoked from {@link #start()} when
+   * needed.
+   *
+   * This implementation delegates all the work to the (overridable)
+   * {@link #initialize(Configuration)} method, then calls
+   * {@link #setServiceState(ServiceState)} to mark the service as live.
+   * Any subclasses that do not consider themsevles to be live once 
+   * any subclassed initialize method has returned should override the method
+   * {@link #goLiveAtTheEndOfStart()} to change that behavior.
+   * @throws IOException for any problem.
+   */
+  @Override
+  protected void innerStart() throws IOException {
+    initialize(getConf());
+    if(goLiveAtTheEndOfStart()) {
+      setServiceState(ServiceState.LIVE);
+    }
+  }
+
+  /**
+   * Override point: should the NameNode enter the live state at the end of
+   * the {@link #innerStart()} operation?
+   * @return true if the service should enter the live state at this point,
+   * false to leave the service in its current state.
+   */
+  protected boolean goLiveAtTheEndOfStart() {
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}.
+   *
+   * This implementation checks for the name system being non-null and live
+   *
+   * @param status status response to build up
+   * @throws IOException       for IO failure; this will be caught and included
+   * in the status message
+   */
+  @Override
+  public void innerPing(ServiceStatus status) throws IOException {
+    if (namesystem == null) {
+      status.addThrowable(new LivenessException("No name system"));
+    } else {
+      try {
+        namesystem.ping();
+      } catch (IOException e) {
+        status.addThrowable(e);
+      }
+    }
+    if (httpServer == null || !httpServer.isAlive()) {
+      status.addThrowable(
+              new IOException("NameNode HttpServer is not running"));
     }
   }
 
@@ -415,15 +494,33 @@
    */
   public void join() {
     try {
-      this.server.join();
+      if (server != null) {
+        server.join();
+      }
     } catch (InterruptedException ie) {
     }
   }
 
   /**
    * Stop all NameNode threads and wait for all to finish.
+   * <p/>
+   * Retained for backwards compatibility.
+   */
+  public final void stop() {
+    closeQuietly();
+  }
+
+  /**
+   * {@inheritDoc} 
+   * <p/>
+   * To shut down, this service stops all NameNode threads and
+   * waits for them to finish. It also stops the metrics.
+   * @throws IOException for any IO problem
    */
-  public void stop() {
+  @Override
+  public synchronized void innerClose() throws IOException {
+    LOG.info("Closing " + getServiceName());
+
     if (stopRequested)
       return;
     stopRequested = true;
@@ -441,14 +538,23 @@
     } catch (Exception e) {
       LOG.error(StringUtils.stringifyException(e));
     }
-    if(namesystem != null) namesystem.close();
-    if(emptier != null) emptier.interrupt();
-    if(server != null) server.stop();
+    if(namesystem != null) {
+      namesystem.close();
+    }
+    if(emptier != null) {
+      emptier.interrupt();
+      emptier = null;
+    }
+    if(server != null) {
+      server.stop();
+      server = null;
+    }
     if (myMetrics != null) {
       myMetrics.shutdown();
     }
     if (namesystem != null) {
       namesystem.shutdown();
+      namesystem = null;
     }
   }
   
@@ -1154,9 +1260,13 @@
         System.exit(aborted ? 1 : 0);
       case BACKUP:
       case CHECKPOINT:
-        return new BackupNode(conf, startOpt.toNodeRole());
+        BackupNode backupNode = new BackupNode(conf, startOpt.toNodeRole());
+        startService(backupNode);
+        return backupNode;
       default:
-        return new NameNode(conf);
+        NameNode nameNode = new NameNode(conf);
+        startService(nameNode);
+        return nameNode;
     }
   }
     

Modified: hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java?rev=779111&r1=779110&r2=779111&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java (original)
+++ hadoop/core/branches/HADOOP-3628-2/src/hdfs/org/apache/hadoop/hdfs/server/namenode/PendingReplicationBlocks.java Wed May 27 11:07:24 2009
@@ -131,6 +131,15 @@
   }
 
   /**
+   * Test for the replicator being alive.
+   * @return true if the thread is running.
+   */
+  boolean isAlive() {
+    Daemon daemon = timerThread;
+    return daemon != null && daemon.isAlive();
+  }
+
+  /**
    * An object that contains information about a block that 
    * is being replicated. It records the timestamp when the 
    * system started replicating the most recent copy of this