You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by bo...@apache.org on 2011/02/23 21:22:36 UTC

svn commit: r1073927 - in /hadoop/hdfs/branches/HDFS-1052: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org...

Author: boryas
Date: Wed Feb 23 20:22:36 2011
New Revision: 1073927

URL: http://svn.apache.org/viewvc?rev=1073927&view=rev
Log:
HDFS-1634. Federation: Convert single threaded DataNode into per BlockPool thread model.

Added:
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
Modified:
    hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
    hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
    hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java

Modified: hadoop/hdfs/branches/HDFS-1052/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/CHANGES.txt?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1052/CHANGES.txt Wed Feb 23 20:22:36 2011
@@ -22,6 +22,9 @@ Trunk (unreleased changes)
     HDFS-1632. Federation: data node storage structure changes and
     introduce block pool storage. (tanping via suresh)
 
+    HDFS-1634. Federation: Convert single threaded DataNode into 
+    per BlockPool thread model.(boryas)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Feb 23 20:22:36 2011
@@ -223,4 +223,6 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_SECONDARY_NAMENODE_KRB_HTTPS_USER_NAME_KEY = "dfs.secondary.namenode.kerberos.https.principal";
   public static final String  DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY = "dfs.namenode.name.cache.threshold";
   public static final int     DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT = 10;
+  
+  public static final String DFS_FEDERATION_NAMENODES = "dfs.federation.namenodes.uri";
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java Wed Feb 23 20:22:36 2011
@@ -130,4 +130,11 @@ public class StorageInfo implements Writ
     }
     clusterID = cid;
   }
+  
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("lv=").append(layoutVersion).append(";cid=").append(clusterID)
+    .append(";nsid=").append(namespaceID).append(";c=").append(cTime);
+    return sb.toString();
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolStorage.java Wed Feb 23 20:22:36 2011
@@ -66,11 +66,12 @@ public class BlockPoolStorage extends St
     super(NodeType.DATA_NODE);
   }
 
-  BlockPoolStorage(int namespaceID, String bpID, long cTime) {
+  BlockPoolStorage(int namespaceID, String bpID, long cTime, String clusterId) {
     super(NodeType.DATA_NODE);
     this.namespaceID = namespaceID;
     this.blockpoolID = bpID;
     this.cTime = cTime;
+    this.clusterID = clusterId;
   }
 
   /**
@@ -508,4 +509,9 @@ public class BlockPoolStorage extends St
   public boolean isConversionNeeded(StorageDirectory sd) throws IOException {
     return false;
   }
+  
+  @Override
+  public String toString() {
+    return super.toString() + ";bpid=" + blockpoolID;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Feb 23 20:22:36 2011
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.lang.management.ManagementFactory;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
@@ -49,6 +50,9 @@ import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -126,11 +130,6 @@ import org.apache.hadoop.util.DiskChecke
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
-import java.lang.management.ManagementFactory;  
-
-import javax.management.MBeanServer; 
-import javax.management.ObjectName;
-
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
  * blocks for a DFS deployment.  A single deployment can
@@ -165,7 +164,7 @@ import javax.management.ObjectName;
 @InterfaceAudience.Private
 public class DataNode extends Configured 
     implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
-    Runnable, DataNodeMXBean {
+    DataNodeMXBean {
   public static final Log LOG = LogFactory.getLog(DataNode.class);
   
   static{
@@ -196,23 +195,22 @@ public class DataNode extends Configured
     return NetUtils.createSocketAddr(target);
   }
   
+  BPOfferService[] nameNodeThreads;
+  private Map<String, BPOfferService> bpMapping = 
+    new HashMap<String, BPOfferService>();
   public DatanodeProtocol namenode = null;
   public FSDatasetInterface data = null;
   public DatanodeRegistration dnRegistration = null;
+  private String clusterId = null;
 
   volatile boolean shouldRun = true;
-  private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-  private LinkedList<String> delHints = new LinkedList<String>();
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
   Daemon dataXceiverServer = null;
   ThreadGroup threadGroup = null;
   long blockReportInterval;
-  //disallow the sending of BR before instructed to do so
-  long lastBlockReport = 0;
   boolean resetBlockReportTime = true;
   long initialBlockReportDelay = BLOCKREPORT_INITIAL_DELAY * 1000L;
-  long lastHeartbeat = 0;
   long heartBeatInterval;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
@@ -220,8 +218,7 @@ public class DataNode extends Configured
   private InetSocketAddress nameNodeAddr;
   private InetSocketAddress nameNodeAddrForClient;
   private InetSocketAddress selfAddr;
-  private static DataNode datanodeObject = null;
-  private Thread dataNodeThread = null;
+  static DataNode datanodeObject = null;
   String machineName;
   private static String dnThreadName;
   int socketTimeout;
@@ -243,8 +240,10 @@ public class DataNode extends Configured
   // For InterDataNodeProtocol
   public Server ipcServer;
 
-  private SecureResources secureResources = null;  
-  
+  private SecureResources secureResources = null;
+  private AbstractList<File> dataDirs;
+  private Configuration conf;
+
   /**
    * Create the DataNode given a configuration and an array of dataDirs.
    * 'dataDirs' is where the blocks are stored.
@@ -282,8 +281,14 @@ public class DataNode extends Configured
       startDataNode(conf, dataDirs, namenode, resources);
     } catch (IOException ie) {
       shutdown();
-     throw ie;
-   }
+      throw ie;
+    }
+  }
+
+  private synchronized void setClusterId(String cid) {
+    if(clusterId==null) {
+      clusterId = cid;
+    }
   }
 
   private void initConfig(Configuration conf) throws UnknownHostException {
@@ -296,9 +301,10 @@ public class DataNode extends Configured
                                      conf.get("dfs.datanode.dns.interface","default"),
                                      conf.get("dfs.datanode.dns.nameserver","default"));
     }
-    this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
-    this.nameNodeAddrForClient = NameNode.getAddress(conf);
-    
+
+    // TODO:FEDERATION this.nameNodeAddr = NameNode.getServiceAddress(conf, true);
+    // FEDDERATION this.nameNodeAddrForClient = NameNode.getAddress(conf);
+
     this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
@@ -361,44 +367,6 @@ public class DataNode extends Configured
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
   }
   
-  private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
-      throws IOException {
-    // get version and id info from the name-node
-    NamespaceInfo nsInfo = handshake();
-
-    StartupOption startOpt = getStartupOption(conf);
-    assert startOpt != null : "Startup option must be set.";
-    
-
-    boolean simulatedFSDataset = 
-        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
-    if (simulatedFSDataset) {
-        setNewStorageID(dnRegistration);
-        dnRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
-        dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
-        dnRegistration.storageInfo.clusterID = nsInfo.clusterID;
-        // it would have been better to pass storage as a parameter to
-        // constructor below - need to augment ReflectionUtils used below.
-        conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
-        try {
-          //Equivalent of following (can't do because Simulated is in test dir)
-          //  this.data = new SimulatedFSDataset(conf);
-          this.data = (FSDatasetInterface) ReflectionUtils.newInstance(
-              Class.forName("org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"), conf);
-        } catch (ClassNotFoundException e) {
-          throw new IOException(StringUtils.stringifyException(e));
-        }
-    } else { // real storage
-      // read storage info, lock data dirs and transition fs state if necessary
-      storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-      // adjust
-      this.dnRegistration.setStorageInfo(storage);
-      // initialize data node internal structure
-      this.data = new FSDataset(storage, conf);
-    }
-  }
-  
-
   private void startPlugins(Configuration conf) {
     plugins = conf.getInstances("dfs.datanode.plugins", ServicePlugin.class);
     for (ServicePlugin p: plugins) {
@@ -474,6 +442,602 @@ public class DataNode extends Configured
     this.threadGroup.setDaemon(true); // auto destroy when empty
   }
   
+  // calls specific to BP
+  protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+    BPOfferService bpos = bpMapping.get(block.getPoolId());
+    if(bpos != null)
+      bpos.notifyNamenodeReceivedBlock(block, delHint); 
+    else
+      LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+          + block.getPoolId());
+  }
+
+  /**
+   * A thread per namenode to perform:
+   * <ul>
+   * <li> Pre-registration handshake with namenode</li>
+   * <li> Registration with namenode</li>
+   * <li> Send periodic heartbeats to the namenode</li>
+   * <li> Handle commands received from the datanode</li>
+   * </ul>
+   */
+  class BPOfferService implements Runnable {
+    final InetSocketAddress nn_addr;
+    DatanodeRegistration bpRegistration;
+    NamespaceInfo bpNSInfo;
+    long lastBlockReport = 0;
+    private Thread bpThread;
+    private DatanodeProtocol bpNamenode;
+    private String blockPoolId;
+    private long lastHeartbeat = 0;
+    private boolean initialized = false;
+    private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+    private final LinkedList<String> delHints = new LinkedList<String>();
+
+    BPOfferService(InetSocketAddress isa, DatanodeRegistration bpRegistration) {
+      this.bpRegistration = bpRegistration;
+      this.nn_addr = isa;
+    }
+
+    /**
+     * returns true if BP thread has completed initialization
+     * @return true if initialized
+     */
+    public boolean initialized() {
+      return initialized;
+    }
+    
+    public String getBlockPoolId() {
+      return blockPoolId;
+    }
+ 
+    void setNamespaceInfo(NamespaceInfo nsinfo) {
+      bpNSInfo = nsinfo;
+      this.blockPoolId = nsinfo.getBlockPoolID();
+      bpMapping.put(blockPoolId, this);
+    }
+
+    void setNameNode(DatanodeProtocol dnProtocol) {
+      this.bpNamenode = dnProtocol;
+    }
+
+    private NamespaceInfo handshake() throws IOException {
+      NamespaceInfo nsInfo = new NamespaceInfo();
+      while (shouldRun) {
+        try {
+          nsInfo = bpNamenode.versionRequest();
+          break;
+        } catch(SocketTimeoutException e) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + nn_addr);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {}
+        }
+      }
+      LOG.info("handshake: nsifno= " + nsInfo);
+      // TODO:FEDERATION on version mismatch datanode should continue
+      // to retry
+      // verify build version
+      if(! nsInfo.getBuildVersion().equals(Storage.getBuildVersion())) {
+        String errorMsg = "Incompatible build versions: namenode BV = " 
+          + nsInfo.getBuildVersion() + "; datanode BV = "
+          + Storage.getBuildVersion();
+        LOG.fatal(errorMsg);
+        try {
+          bpNamenode.errorReport( bpRegistration,
+              DatanodeProtocol.NOTIFY, errorMsg );
+        } catch( SocketTimeoutException e ) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + nn_addr);
+        }
+        throw new IOException( errorMsg );
+      }
+      assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+        "Data-node and name-node layout versions must be the same."
+        + "Expected: "+ FSConstants.LAYOUT_VERSION 
+        + " actual "+ nsInfo.getLayoutVersion();
+      return nsInfo;
+    }
+
+
+    void setupBP(Configuration conf, AbstractList<File> dataDirs) 
+    throws IOException {
+      // get NN proxy
+      DatanodeProtocol dnp = 
+        (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+            DatanodeProtocol.versionID, nn_addr, conf);
+      LOG.info("NN proxy created in BP="+blockPoolId + " for " + nn_addr);
+      setNameNode(dnp);
+
+      // handshake with NN
+      NamespaceInfo nsInfo = handshake();
+      LOG.info("received namespace info  nsInfo=" + nsInfo);
+      setNamespaceInfo(nsInfo);
+      setClusterId(nsInfo.clusterID);
+      
+      // setup storage..
+      StartupOption startOpt = getStartupOption(conf);
+      assert startOpt != null : "Startup option must be set.";
+
+      boolean simulatedFSDataset = 
+        conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+      if (simulatedFSDataset) {
+        bpRegistration.setStorageID(dnRegistration.getStorageID()); // same as mother DN
+        bpRegistration.storageInfo.layoutVersion = FSConstants.LAYOUT_VERSION;
+        bpRegistration.storageInfo.namespaceID = bpNSInfo.namespaceID;
+        bpRegistration.storageInfo.clusterID = bpNSInfo.clusterID;
+      //????        bpRegistration.storageInfo.blockpoolID = bpNSInfo.blockpoolID; // TODO:FEDERATION
+      } else {
+        // read storage info, lock data dirs and transition fs state if necessary          
+        storage.recoverTransitionRead(blockPoolId, bpNSInfo, dataDirs, startOpt);
+        LOG.info("in setUp setting up storage: nsid=" + storage.namespaceID +
+            ";bpid=" + blockPoolId + 
+            ";lv=" + storage.layoutVersion +
+            ";nsInfo=" + bpNSInfo);
+
+        // use BlockPoolStorage as storageInfo in registration.
+        bpRegistration.setStorageID(storage.getStorageID());
+        bpRegistration.setStorageInfo(storage.getBPStorage(blockPoolId));
+        //data.addStorage(blockPoolId, storage);
+      }      
+    }
+
+    /**
+     * This methods  arranges for the data node to send the block report at 
+     * the next heartbeat.
+     */
+    void scheduleBlockReport(long delay) {
+      if (delay > 0) { // send BR after random delay
+        lastBlockReport = System.currentTimeMillis()
+        - ( blockReportInterval - R.nextInt((int)(delay)));
+      } else { // send at next heartbeat
+        lastBlockReport = lastHeartbeat - blockReportInterval;
+      }
+      resetBlockReportTime = true; // reset future BRs for randomness
+    }
+
+    /**
+     * Report received blocks and delete hints to the Namenode
+     * @throws IOException
+     */
+    private void reportReceivedBlocks() throws IOException {
+      //check if there are newly received blocks
+      Block [] blockArray=null;
+      String [] delHintArray=null;
+      synchronized(receivedBlockList) {
+        synchronized(delHints){
+          int numBlocks = receivedBlockList.size();
+          if (numBlocks > 0) {
+            if(numBlocks!=delHints.size()) {
+              LOG.warn("Panic: receiveBlockList and delHints are not of " +
+              		"the same length" );
+            }
+            //
+            // Send newly-received blockids to namenode
+            //
+            blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+            delHintArray = delHints.toArray(new String[numBlocks]);
+          }
+        }
+      }
+      if (blockArray != null) {
+        if(delHintArray == null || delHintArray.length != blockArray.length ) {
+          LOG.warn("Panic: block array & delHintArray are not the same" );
+        }
+        bpNamenode.blockReceived(dnRegistration, blockPoolId, blockArray,
+            delHintArray);
+        synchronized(receivedBlockList) {
+          synchronized(delHints){
+            for(int i=0; i<blockArray.length; i++) {
+              receivedBlockList.remove(blockArray[i]);
+              delHints.remove(delHintArray[i]);
+            }
+          }
+        }
+      }
+    }
+
+    /*
+     * Informing the name node could take a long long time! Should we wait
+     * till namenode is informed before responding with success to the
+     * client? For now we don't.
+     */
+    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+      if(block==null || delHint==null) {
+        throw new IllegalArgumentException(
+            block==null?"Block is null":"delHint is null");
+      }
+      
+      if (block.getPoolId().equals(blockPoolId)) {
+        LOG.warn("BlockPool is mismaptch " + block.getBlockId() + 
+            " vs. " + blockPoolId);
+        return;
+      }
+      
+      synchronized (receivedBlockList) {
+        synchronized (delHints) {
+          receivedBlockList.add(block.getLocalBlock());
+          delHints.add(delHint);
+          receivedBlockList.notifyAll();
+        }
+      }
+    }
+
+
+    /**
+     * Report the list blocks to the Namenode
+     * @throws IOException
+     */
+    DatanodeCommand blockReport() throws IOException {
+      // send block report
+      DatanodeCommand cmd = null;
+      long startTime = now();
+      if (startTime - lastBlockReport > blockReportInterval) {
+        //
+        // Send latest block report if timer has expired.
+        // Get back a list of local block(s) that are obsolete
+        // and can be safely GC'ed.
+        //
+        long brStartTime = now();
+        BlockListAsLongs bReport = data.getBlockReport(/* TODO:FEDERATION pass blockPoolId*/);
+
+        // TODO:FEDERATION add support for pool ID
+        cmd = bpNamenode.blockReport(bpRegistration, blockPoolId, bReport
+            .getBlockListAsLongs());
+        long brTime = now() - brStartTime;
+        myMetrics.blockReports.inc(brTime);
+        LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
+            " blocks got processed in " + brTime + " msecs");
+        //
+        // If we have sent the first block report, then wait a random
+        // time before we start the periodic block reports.
+        //
+        if (resetBlockReportTime) {
+          lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
+          resetBlockReportTime = false;
+        } else {
+          /* say the last block report was at 8:20:14. The current report
+           * should have started around 9:20:14 (default 1 hour interval).
+           * If current time is :
+           *   1) normal like 9:20:18, next report should be at 10:20:14
+           *   2) unexpected like 11:35:43, next report should be at 12:20:14
+           */
+          lastBlockReport += (now() - lastBlockReport) /
+          blockReportInterval * blockReportInterval;
+        }
+        LOG.info("sent block report, processed command:" + cmd);
+      }
+      return cmd;
+    }
+    
+    
+    DatanodeCommand [] sendHeartBeat() throws IOException {
+      return bpNamenode.sendHeartbeat(bpRegistration,
+          data.getCapacity(),
+          data.getDfsUsed(),
+          data.getRemaining(),
+          xmitsInProgress.get(),
+          getXceiverCount());
+    }
+
+
+    /**
+     * Main loop for each BP thread. Run until shutdown,
+     * forever calling remote NameNode functions.
+     */
+    private void offerService() throws Exception {
+      LOG.info("For namenode " + nn_addr + " using BLOCKREPORT_INTERVAL of "
+          + blockReportInterval + "msec" + " Initial delay: "
+          + initialBlockReportDelay + "msec" + "; heartBeatInterval="
+          + heartBeatInterval);
+
+      //
+      // Now loop for a long time....
+      //
+      while (shouldRun) {
+        try {
+          long startTime = now();
+
+          //
+          // Every so often, send heartbeat or block-report
+          //
+          if (startTime - lastHeartbeat > heartBeatInterval) {
+            //
+            // All heartbeat messages include following info:
+            // -- Datanode name
+            // -- data transfer port
+            // -- Total capacity
+            // -- Bytes remaining
+            //
+            lastHeartbeat = startTime;
+            // TODO:FEDERATION include some global DN stats..
+            DatanodeCommand[] cmds = sendHeartBeat();
+            myMetrics.heartbeats.inc(now() - startTime);
+            if (!processCommand(cmds))
+              continue;
+          }
+
+          reportReceivedBlocks();
+
+          DatanodeCommand cmd = blockReport();
+          processCommand(cmd);
+
+          // start block scanner
+          if (blockScanner != null) {
+            synchronized(blockScanner) { // SHOULD BE MOVED OUT OF THE THREAD.. FEDERATION
+              if(blockScannerThread == null && upgradeManager.isUpgradeCompleted()) {
+                LOG.info("Starting Periodic block scanner.");
+                blockScannerThread = new Daemon(blockScanner);
+                blockScannerThread.start();
+              }
+            }
+          }
+
+          //
+          // There is no work to do;  sleep until hearbeat timer elapses, 
+          // or work arrives, and then iterate again.
+          //
+          long waitTime = heartBeatInterval - 
+          (System.currentTimeMillis() - lastHeartbeat);
+          synchronized(receivedBlockList) {
+            if (waitTime > 0 && receivedBlockList.size() == 0) {
+              try {
+                receivedBlockList.wait(waitTime);
+              } catch (InterruptedException ie) {
+              }
+            }
+          } // synchronized
+        } catch(RemoteException re) {
+          String reClass = re.getClassName();
+          if (UnregisteredNodeException.class.getName().equals(reClass) ||
+              DisallowedDatanodeException.class.getName().equals(reClass) ||
+              IncorrectVersionException.class.getName().equals(reClass)) {
+            LOG.warn("DataNode is shutting down: " + 
+                StringUtils.stringifyException(re));
+            shutdown();  // TODO:FEDERATION - ??? what to do here
+            return;
+          }
+          LOG.warn(StringUtils.stringifyException(re));
+          try {
+            long sleepTime = Math.min(1000, heartBeatInterval);
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+          }
+        } catch (IOException e) {
+          LOG.warn(StringUtils.stringifyException(e));
+        }
+      } // while (shouldRun)
+    } // offerService
+
+
+
+    /**
+     * Register one bp with the corresponding NameNode
+     * <p>
+     * The bpDatanode needs to register with the namenode on startup in order
+     * 1) to report which storage it is serving now and 
+     * 2) to receive a registrationID
+     *  
+     * issued by the namenode to recognize registered datanodes.
+     * 
+     * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+     * @throws IOException
+     */
+    void register() throws IOException {
+      LOG.info("in register: sid=" + bpRegistration.getStorageID() + ";SI="
+          + bpRegistration.storageInfo); 
+                
+      while(shouldRun) {
+        try {
+          // reset name to machineName. Mainly for web interface. Same for all DB
+          bpRegistration.name = machineName + ":" + bpRegistration.getPort();
+          LOG.info("bpReg before =" + bpRegistration.storageInfo + 
+              ";sid=" + bpRegistration.storageID);
+          bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+          LOG.info("bpReg after =" + bpRegistration.storageInfo + 
+              ";sid=" + bpRegistration.storageID);
+          break;
+        } catch(SocketTimeoutException e) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + nn_addr);
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {}
+        }
+      }
+
+      
+      // TODO:FEDERATION - reavaluate the following three checks!!!!!
+      assert ("".equals(storage.getStorageID())
+          && !"".equals(bpRegistration.getStorageID()))
+          || storage.getStorageID().equals(bpRegistration.getStorageID()) :
+      "New storageID can be assigned only if data-node is not formatted";
+
+      if (storage.getStorageID().equals("")) {
+        storage.setStorageID(bpRegistration.getStorageID());
+        storage.writeAll();
+        LOG.info("New storage id " + bpRegistration.getStorageID()
+            + " is assigned to data-node " + bpRegistration.getName());
+      }
+      if(! storage.getStorageID().equals(bpRegistration.getStorageID())) {
+        throw new IOException("Inconsistent storage IDs. Name-node returned "
+            + bpRegistration.getStorageID() 
+            + ". Expecting " + storage.getStorageID());
+      }
+
+      if (!isBlockTokenInitialized) {
+        /* first time registering with NN */
+        ExportedBlockKeys keys = bpRegistration.exportedKeys;
+        isBlockTokenEnabled = keys.isBlockTokenEnabled();
+        if (isBlockTokenEnabled) {
+          long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
+          long blockTokenLifetime = keys.getTokenLifetime();
+          LOG.info("Block token params received from NN: keyUpdateInterval="
+              + blockKeyUpdateInterval / (60 * 1000)
+              + " min(s), tokenLifetime=" + blockTokenLifetime / (60 * 1000)
+              + " min(s)");
+          blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
+        }
+        isBlockTokenInitialized = true;
+      }
+
+      if (isBlockTokenEnabled) {
+        blockTokenSecretManager.setKeys(bpRegistration.exportedKeys);
+        bpRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
+      }
+
+      LOG.info("in register:" + ";bpDNR="+bpRegistration.storageInfo);
+
+      // random short delay - helps scatter the BR from all DNs
+      scheduleBlockReport(initialBlockReportDelay);
+    }
+
+
+    /**
+     * No matter what kind of exception we get, keep retrying to offerService().
+     * That's the loop that connects to the NameNode and provides basic DataNode
+     * functionality.
+     *
+     * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
+     */
+    public void run() {
+      LOG.info(bpRegistration + "In BPOfferService.run, data = " + data + 
+          ";bp="+blockPoolId);
+
+      //init stuff
+      try {
+        // setup storage
+        setupBP(conf, dataDirs);
+        register();
+      } catch (IOException ioe) {
+        LOG.error(bpRegistration + ": Setup failed", ioe);
+        try {
+          // TODO:FEDERATION needs to unlock only this specific storage...
+          // and remove it....
+          storage.unlockAll(); 
+        } catch (Exception e) { 
+          LOG.warn("failed to unlock storage for dn: " + bpRegistration, e);
+        }
+        // TODO:FEDERATION should be local only
+        //shutdown();
+        return;
+      }
+
+      initialized = true; // bp is initialized;
+
+      while (shouldRun) {
+        try {
+          // TODO:FEDERATION needs to be moved too
+          startDistributedUpgradeIfNeeded();
+          offerService();
+        } catch (Exception ex) {
+          LOG.error("Exception: " + StringUtils.stringifyException(ex));
+          if (shouldRun) {
+            try {
+              Thread.sleep(5000);
+            } catch (InterruptedException ie) {
+            }
+          }
+        }
+      }
+
+      LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
+      shutdown();
+    }
+
+    /**
+     * Process an array of datanode commands
+     * 
+     * @param cmds an array of datanode commands
+     * @return true if further processing may be required or false otherwise. 
+     */
+    private boolean processCommand(DatanodeCommand[] cmds) {
+      if (cmds != null) {
+        for (DatanodeCommand cmd : cmds) {
+          try {
+            if (processCommand(cmd) == false) {
+              return false;
+            }
+          } catch (IOException ioe) {
+            LOG.warn("Error processing datanode Command", ioe);
+          }
+        }
+      }
+      return true;
+    }
+
+    /**
+     * 
+     * @param cmd
+     * @return true if further processing may be required or false otherwise. 
+     * @throws IOException
+     */
+    private boolean processCommand(DatanodeCommand cmd) throws IOException {
+      if (cmd == null)
+        return true;
+      final BlockCommand bcmd = 
+        cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
+      switch(cmd.getAction()) {
+      case DatanodeProtocol.DNA_TRANSFER:
+        // Send a copy of a block to another datanode
+        transferBlocks(bcmd.getPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+        myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
+        break;
+      case DatanodeProtocol.DNA_INVALIDATE:
+        //
+        // Some local block(s) are obsolete and can be 
+        // safely garbage-collected.
+        //
+        Block toDelete[] = bcmd.getBlocks();
+        try {
+          if (blockScanner != null) {
+            blockScanner.deleteBlocks(bcmd.getPoolId(), toDelete);
+          }
+          // using global fsdataset
+          data.invalidate(bcmd.getPoolId(), toDelete);
+        } catch(IOException e) {
+          checkDiskError();
+          throw e;
+        }
+        myMetrics.blocksRemoved.inc(toDelete.length);
+        break;
+      case DatanodeProtocol.DNA_SHUTDOWN:
+        // shut down the data node
+        shutdown();  //TODO:FEDERATION  - we should not shutdown the whole datanode.
+        return false;
+      case DatanodeProtocol.DNA_REGISTER:
+        // namenode requested a registration - at start or if NN lost contact
+        LOG.info("DatanodeCommand action: DNA_REGISTER");
+        if (shouldRun) {
+          register();
+        }
+        break;
+      case DatanodeProtocol.DNA_FINALIZE:
+        // TODO:FEDERATION - global storage????? or per BP storage - add real BPID
+        storage.finalizeUpgrade("FAKE ID NEEDS TO BE REPLACED");
+        break;
+      case UpgradeCommand.UC_ACTION_START_UPGRADE:
+        // start distributed upgrade here
+        processDistributedUpgradeCommand((UpgradeCommand)cmd);
+        break;
+      case DatanodeProtocol.DNA_RECOVERBLOCK:
+        // TODO:FEDERATION - global storage????? or per BP storage
+        recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+        break;
+      case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+        LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+        if (isBlockTokenEnabled) {
+          blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
+        }
+        break;
+      default:
+        LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+      }
+      return true;
+    }
+
+  }
+
   /**
    * This method starts the data node with the specified conf.
    * 
@@ -492,29 +1056,102 @@ public class DataNode extends Configured
       throw new RuntimeException("Cannot start secure cluster without " +
       "privileged resources.");
 
+    // settings global for all BPs in the Data Node
     this.secureResources = resources;
-    this.namenode = namenode;
+    this.dataDirs = dataDirs;
+    this.conf = conf;
+
     storage = new DataStorage();
     
+    // global DN settings
     initConfig(conf);
     registerMXBean();
+    initFsDataSet(conf, dataDirs); // TODO:FEDERATION should this be moved to after at least one storage is created..
     initDataXceiver(conf);
-    initFsDataSet(conf, dataDirs);
-    initBlockScanner(conf);
     startInfoServer(conf);
-  
+    initIpcServer(conf); // TODO:FEDERATION redirect the call appropriately 
+
     myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
-    // TODO check what code removed here
 
-    initIpcServer(conf);
+    // get all the NNs configured
+    nameNodeThreads = getAllNamenodes(conf);
+  }
+  
+  private void initFsDataSet(Configuration conf, AbstractList<File> dataDirs)
+  throws IOException {
+    // get version and id info from the name-node
+    boolean simulatedFSDataset = 
+      conf.getBoolean("dfs.datanode.simulateddatastorage", false);
+
+    if (simulatedFSDataset) {
+      
+      if(data == null) { // create FSDataset
+        setNewStorageID(dnRegistration);
+        conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY,
+            dnRegistration.getStorageID());
+        
+        // it would have been better to pass storage as a parameter to
+        // constructor below - need to augment ReflectionUtils used below.
+
+        try {
+          //TODO:FEDERATION Equivalent of following (can't do because Simulated is in test dir)
+          if(data==null) {
+            data = (FSDatasetInterface) ReflectionUtils.newInstance(
+              Class.forName(
+                  "org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset"),
+                  conf);
+          }
+        } catch (ClassNotFoundException e) {
+          throw new IOException(StringUtils.stringifyException(e));
+        }
+
+      }
+      // TODO:FEDERATION do we need set it to the general dnRegistration?????
+      // TODO:FEDERATION do we need LV,NSid, cid,bpid for datanode version file?
+      
+    } else {
+      if(data == null)
+        data = new FSDataset(storage, conf);
+    }
+  }
+
+
+  void postStartInit(Configuration conf, AbstractList<File> dataDirs)
+  throws IOException {
+
+    initBlockScanner(conf);
+
     startPlugins(conf);
     
     // BlockTokenSecretManager is created here, but it shouldn't be
     // used until it is initialized in register().
-    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);
+    this.blockTokenSecretManager = new BlockTokenSecretManager(false, 0, 0);    
   }
 
   /**
+   * for each namenode create an offerservice object 
+   * Threads will be started later (out of DataNode constructor)
+   * @param conf
+   * @throws IOException
+   */
+  private BPOfferService[] getAllNamenodes(Configuration conf)
+      throws IOException {
+    if(nameNodeThreads != null)
+      return nameNodeThreads; // already initialized
+    
+    // get NNs addresses from the configuration
+    InetSocketAddress[] isas = NameNode.getNNAddresses(conf);
+
+    AbstractList<BPOfferService> al = new ArrayList<BPOfferService> (isas.length);
+    for(InetSocketAddress isa : isas) {
+      BPOfferService bpos = new BPOfferService(isa, dnRegistration);
+      al.add(bpos);
+    }
+    nameNodeThreads = new BPOfferService[isas.length];
+    return al.toArray(nameNodeThreads);
+  }
+  
+  /**
    * Determine the http server's effective addr
    */
   public static InetSocketAddress getInfoAddr(Configuration conf) {
@@ -540,48 +1177,12 @@ public class DataNode extends Configured
     return (socketWriteTimeout > 0) ? 
            SocketChannel.open().socket() : new Socket();                                   
   }
-  
-  private NamespaceInfo handshake() throws IOException {
-    NamespaceInfo nsInfo = new NamespaceInfo();
-    while (shouldRun) {
-      try {
-        nsInfo = namenode.versionRequest();
-        break;
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + getNameNodeAddr());
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {}
-      }
-    }
-    String errorMsg = null;
-    // verify build version
-    if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
-      errorMsg = "Incompatible build versions: namenode BV = " 
-        + nsInfo.getBuildVersion() + "; datanode BV = "
-        + Storage.getBuildVersion();
-      LOG.fatal( errorMsg );
-      try {
-        namenode.errorReport( dnRegistration,
-                              DatanodeProtocol.NOTIFY, errorMsg );
-      } catch( SocketTimeoutException e ) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + getNameNodeAddr());
-      }
-      throw new IOException( errorMsg );
-    }
-    assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
-      "Data-node and name-node layout versions must be the same."
-      + "Expected: "+ FSConstants.LAYOUT_VERSION + " actual "+ nsInfo.getLayoutVersion();
-    return nsInfo;
-  }
 
   private static void setDataNode(DataNode node) {
     datanodeObject = node;
   }
 
-  /** Return the DataNode object
-   * 
-   */
+  /** Return the DataNode object */
   public static DataNode getDataNode() {
     return datanodeObject;
   } 
@@ -661,73 +1262,6 @@ public class DataNode extends Configured
     dnReg.storageID = "DS-" + rand + "-"+ ip + "-" + dnReg.getPort() + "-" + 
                       System.currentTimeMillis();
   }
-  /**
-   * Register datanode
-   * <p>
-   * The datanode needs to register with the namenode on startup in order
-   * 1) to report which storage it is serving now and 
-   * 2) to receive a registrationID 
-   * issued by the namenode to recognize registered datanodes.
-   * 
-   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
-   * @throws IOException
-   */
-  private void register() throws IOException {
-    if (dnRegistration.getStorageID().equals("")) {
-      setNewStorageID(dnRegistration);
-    }
-    while(shouldRun) {
-      try {
-        // reset name to machineName. Mainly for web interface.
-        dnRegistration.name = machineName + ":" + dnRegistration.getPort();
-        dnRegistration = namenode.registerDatanode(dnRegistration);
-        break;
-      } catch(SocketTimeoutException e) {  // namenode is busy
-        LOG.info("Problem connecting to server: " + getNameNodeAddr());
-        try {
-          Thread.sleep(1000);
-        } catch (InterruptedException ie) {}
-      }
-    }
-    assert ("".equals(storage.getStorageID()) 
-            && !"".equals(dnRegistration.getStorageID()))
-            || storage.getStorageID().equals(dnRegistration.getStorageID()) :
-            "New storageID can be assigned only if data-node is not formatted";
-    if (storage.getStorageID().equals("")) {
-      storage.setStorageID(dnRegistration.getStorageID());
-      storage.writeAll();
-      LOG.info("New storage id " + dnRegistration.getStorageID()
-          + " is assigned to data-node " + dnRegistration.getName());
-    }
-    if(! storage.getStorageID().equals(dnRegistration.getStorageID())) {
-      throw new IOException("Inconsistent storage IDs. Name-node returned "
-          + dnRegistration.getStorageID() 
-          + ". Expecting " + storage.getStorageID());
-    }
-    
-    if (!isBlockTokenInitialized) {
-      /* first time registering with NN */
-      ExportedBlockKeys keys = dnRegistration.exportedKeys;
-      this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
-      if (isBlockTokenEnabled) {
-        long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
-        long blockTokenLifetime = keys.getTokenLifetime();
-        LOG.info("Block token params received from NN: keyUpdateInterval="
-            + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
-            + blockTokenLifetime / (60 * 1000) + " min(s)");
-        blockTokenSecretManager.setTokenLifetime(blockTokenLifetime);
-      }
-      isBlockTokenInitialized = true;
-    }
-
-    if (isBlockTokenEnabled) {
-      blockTokenSecretManager.setKeys(dnRegistration.exportedKeys);
-      dnRegistration.exportedKeys = ExportedBlockKeys.DUMMY_KEYS;
-    }
-
-    // random short delay - helps scatter the BR from all DNs
-    scheduleBlockReport(initialBlockReportDelay);
-  }
 
   /**
    * Shut down this instance of the datanode.
@@ -788,6 +1322,16 @@ public class DataNode extends Configured
       }
     }
     
+    // shutdown BPOS thread TODO:FEDERATION - review if this is enough
+    for(BPOfferService bpos : nameNodeThreads) {
+      if(bpos != null && bpos.bpThread!=null) {
+        try {
+          bpos.bpThread.interrupt();
+          bpos.bpThread.join();
+        } catch (InterruptedException ie) {}
+      }
+    }
+    
     RPC.stopProxy(namenode); // stop the RPC threads
     
     if(upgradeManager != null)
@@ -806,13 +1350,6 @@ public class DataNode extends Configured
         LOG.warn("Exception when unlocking storage: " + ie, ie);
       }
     }
-    if (dataNodeThread != null) {
-      dataNodeThread.interrupt();
-      try {
-        dataNodeThread.join();
-      } catch (InterruptedException ie) {
-      }
-    }
     if (data != null) {
       data.shutdown();
     }
@@ -869,7 +1406,7 @@ public class DataNode extends Configured
     
     
     if(hasEnoughResource) {
-      scheduleBlockReport(0);
+      scheduleAllBlockReport(0);
       return; // do not shutdown
     }
     
@@ -882,184 +1419,6 @@ public class DataNode extends Configured
     return threadGroup == null ? 0 : threadGroup.activeCount();
   }
     
-  /**
-   * Main loop for the DataNode.  Runs until shutdown,
-   * forever calling remote NameNode functions.
-   */
-  public void offerService() throws Exception {
-     
-    LOG.info("using BLOCKREPORT_INTERVAL of " + blockReportInterval + "msec" + 
-       " Initial delay: " + initialBlockReportDelay + "msec");
-
-    //
-    // Now loop for a long time....
-    //
-    while (shouldRun) {
-      try {
-        long startTime = now();
-
-        //
-        // Every so often, send heartbeat or block-report
-        //
-        
-        if (startTime - lastHeartbeat > heartBeatInterval) {
-          //
-          // All heartbeat messages include following info:
-          // -- Datanode name
-          // -- data transfer port
-          // -- Total capacity
-          // -- Bytes remaining
-          //
-          lastHeartbeat = startTime;
-          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
-                                                       data.getCapacity(),
-                                                       data.getDfsUsed(),
-                                                       data.getRemaining(),
-                                                       xmitsInProgress.get(),
-                                                       getXceiverCount());
-          myMetrics.heartbeats.inc(now() - startTime);
-          //LOG.info("Just sent heartbeat, with name " + localName);
-          if (!processCommand(cmds))
-            continue;
-        }
-            
-        reportReceivedBlocks();
-
-        DatanodeCommand cmd = blockReport();
-        processCommand(cmd);
-
-        // start block scanner
-        if (blockScanner != null && blockScannerThread == null &&
-            upgradeManager.isUpgradeCompleted()) {
-          LOG.info("Starting Periodic block scanner.");
-          blockScannerThread = new Daemon(blockScanner);
-          blockScannerThread.start();
-        }
-            
-        //
-        // There is no work to do;  sleep until hearbeat timer elapses, 
-        // or work arrives, and then iterate again.
-        //
-        long waitTime = heartBeatInterval - (System.currentTimeMillis() - lastHeartbeat);
-        synchronized(receivedBlockList) {
-          if (waitTime > 0 && receivedBlockList.size() == 0) {
-            try {
-              receivedBlockList.wait(waitTime);
-            } catch (InterruptedException ie) {
-            }
-          }
-        } // synchronized
-      } catch(RemoteException re) {
-        String reClass = re.getClassName();
-        if (UnregisteredNodeException.class.getName().equals(reClass) ||
-            DisallowedDatanodeException.class.getName().equals(reClass) ||
-            IncorrectVersionException.class.getName().equals(reClass)) {
-          LOG.warn("DataNode is shutting down: " + 
-                   StringUtils.stringifyException(re));
-          shutdown();
-          return;
-        }
-        LOG.warn(StringUtils.stringifyException(re));
-        try {
-          long sleepTime = Math.min(1000, heartBeatInterval);
-          Thread.sleep(sleepTime);
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt();
-        }
-      } catch (IOException e) {
-        LOG.warn(StringUtils.stringifyException(e));
-      }
-    } // while (shouldRun)
-  } // offerService
-
-  /**
-   * Process an array of datanode commands
-   * 
-   * @param cmds an array of datanode commands
-   * @return true if further processing may be required or false otherwise. 
-   */
-  private boolean processCommand(DatanodeCommand[] cmds) {
-    if (cmds != null) {
-      for (DatanodeCommand cmd : cmds) {
-        try {
-          if (processCommand(cmd) == false) {
-            return false;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Error processing datanode Command", ioe);
-        }
-      }
-    }
-    return true;
-  }
-  
-    /**
-     * 
-     * @param cmd
-     * @return true if further processing may be required or false otherwise. 
-     * @throws IOException
-     */
-  private boolean processCommand(DatanodeCommand cmd) throws IOException {
-    if (cmd == null)
-      return true;
-    final BlockCommand bcmd = cmd instanceof BlockCommand? (BlockCommand)cmd: null;
-
-    switch(cmd.getAction()) {
-    case DatanodeProtocol.DNA_TRANSFER:
-      // Send a copy of a block to another datanode
-      transferBlocks(bcmd.getPoolId(), bcmd.getBlocks(), bcmd.getTargets());
-      myMetrics.blocksReplicated.inc(bcmd.getBlocks().length);
-      break;
-    case DatanodeProtocol.DNA_INVALIDATE:
-      //
-      // Some local block(s) are obsolete and can be 
-      // safely garbage-collected.
-      //
-      Block toDelete[] = bcmd.getBlocks();
-      try {
-        if (blockScanner != null) {
-          blockScanner.deleteBlocks(bcmd.getPoolId(), toDelete);
-        }
-        data.invalidate(bcmd.getPoolId(), toDelete);
-      } catch(IOException e) {
-        checkDiskError();
-        throw e;
-      }
-      myMetrics.blocksRemoved.inc(toDelete.length);
-      break;
-    case DatanodeProtocol.DNA_SHUTDOWN:
-      // shut down the data node
-      this.shutdown();
-      return false;
-    case DatanodeProtocol.DNA_REGISTER:
-      // namenode requested a registration - at start or if NN lost contact
-      LOG.info("DatanodeCommand action: DNA_REGISTER");
-      if (shouldRun) {
-        register();
-      }
-      break;
-    case DatanodeProtocol.DNA_FINALIZE:
-      storage.finalizeUpgrade(bcmd.getPoolId());
-      break;
-    case UpgradeCommand.UC_ACTION_START_UPGRADE:
-      // start distributed upgrade here
-      processDistributedUpgradeCommand((UpgradeCommand)cmd);
-      break;
-    case DatanodeProtocol.DNA_RECOVERBLOCK:
-      recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
-      break;
-    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
-      LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-      if (isBlockTokenEnabled) {
-        blockTokenSecretManager.setKeys(((KeyUpdateCommand) cmd).getExportedKeys());
-      }
-      break;
-    default:
-      LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
-    }
-    return true;
-  }
-
   // Distributed upgrade manager
   UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
 
@@ -1068,92 +1427,6 @@ public class DataNode extends Configured
     assert upgradeManager != null : "DataNode.upgradeManager is null.";
     upgradeManager.processUpgradeCommand(comm);
   }
-
-  /**
-   * Report received blocks and delete hints to the Namenode
-   * @throws IOException
-   */
-  private void reportReceivedBlocks() throws IOException {
-    //check if there are newly received blocks
-    Block [] blockArray=null;
-    String [] delHintArray=null;
-    synchronized(receivedBlockList) {
-      synchronized(delHints){
-        int numBlocks = receivedBlockList.size();
-        if (numBlocks > 0) {
-          if(numBlocks!=delHints.size()) {
-            LOG.warn("Panic: receiveBlockList and delHints are not of the same length" );
-          }
-          //
-          // Send newly-received blockids to namenode
-          //
-          blockArray = receivedBlockList.toArray(new Block[numBlocks]);
-          delHintArray = delHints.toArray(new String[numBlocks]);
-        }
-      }
-    }
-    if (blockArray != null) {
-      if(delHintArray == null || delHintArray.length != blockArray.length ) {
-        LOG.warn("Panic: block array & delHintArray are not the same" );
-      }
-      // TODO:FEDERATION add support for pool ID
-      namenode.blockReceived(dnRegistration, "TODO", blockArray, delHintArray);
-      synchronized(receivedBlockList) {
-        synchronized(delHints){
-          for(int i=0; i<blockArray.length; i++) {
-            receivedBlockList.remove(blockArray[i]);
-            delHints.remove(delHintArray[i]);
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Report the list blocks to the Namenode
-   * @throws IOException
-   */
-  private DatanodeCommand blockReport() throws IOException {
-    // send block report
-    DatanodeCommand cmd = null;
-    long startTime = now();
-    if (startTime - lastBlockReport > blockReportInterval) {
-      //
-      // Send latest block report if timer has expired.
-      // Get back a list of local block(s) that are obsolete
-      // and can be safely GC'ed.
-      //
-      long brStartTime = now();
-      BlockListAsLongs bReport = data.getBlockReport();
-
-      // TODO:FEDERATION add support for pool ID
-      cmd = namenode.blockReport(dnRegistration, "TODO", bReport
-          .getBlockListAsLongs());
-      long brTime = now() - brStartTime;
-      myMetrics.blockReports.inc(brTime);
-      LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
-          " blocks got processed in " + brTime + " msecs");
-      //
-      // If we have sent the first block report, then wait a random
-      // time before we start the periodic block reports.
-      //
-      if (resetBlockReportTime) {
-        lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
-        resetBlockReportTime = false;
-      } else {
-        /* say the last block report was at 8:20:14. The current report
-         * should have started around 9:20:14 (default 1 hour interval).
-         * If current time is :
-         *   1) normal like 9:20:18, next report should be at 10:20:14
-         *   2) unexpected like 11:35:43, next report should be at 12:20:14
-         */
-        lastBlockReport += (now() - lastBlockReport) /
-                           blockReportInterval * blockReportInterval;
-      }
-    }
-    return cmd;
-  }
-
   /**
    * Start distributed upgrade if it should be initiated by the data-node.
    */
@@ -1220,29 +1493,6 @@ public class DataNode extends Configured
     }
   }
 
-  /*
-   * Informing the name node could take a long long time! Should we wait
-   * till namenode is informed before responding with success to the
-   * client? For now we don't.
-   */
-  protected void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-    if(block==null || delHint==null) {
-      throw new IllegalArgumentException(block==null?"Block is null":"delHint is null");
-    }
-    synchronized (receivedBlockList) {
-      synchronized (delHints) {
-        // TODO:FEDERATION receivedBlockList should be per block pool
-        // TODO:FEDERATION use ExtendedBlock
-        receivedBlockList.add(block.getLocalBlock());
-        delHints.add(delHint);
-        receivedBlockList.notifyAll();
-      }
-    }
-  }
-
-  
-
-
   /* ********************************************************************
   Protocol when a client reads data from Datanode (Cur Ver: 9):
   
@@ -1412,64 +1662,41 @@ public class DataNode extends Configured
    */
   void closeBlock(ExtendedBlock block, String delHint) {
     myMetrics.blocksWritten.inc();
-    notifyNamenodeReceivedBlock(block, delHint);
+    BPOfferService bpos = nameNodeThreads[0];
+    // TODO:FEDERATION - find the corresponding bp - for now , for compiliation, pick the first one
+    bpos.notifyNamenodeReceivedBlock(new ExtendedBlock(block), delHint);
     if (blockScanner != null) {
       blockScanner.addBlock(block);
     }
   }
 
-  /**
-   * No matter what kind of exception we get, keep retrying to offerService().
-   * That's the loop that connects to the NameNode and provides basic DataNode
-   * functionality.
-   *
-   * Only stop when "shouldRun" is turned off (which can only happen at shutdown).
+  /** Start a single datanode daemon and wait for it to finish.
+   *  If this thread is specifically interrupted, it will stop waiting.
    */
-  public void run() {
-    LOG.info(dnRegistration + "In DataNode.run, data = " + data);
+  public void runDatanodeDaemon() throws IOException {
+    if (nameNodeThreads != null) {
+      // Start namenode threads
+      for(BPOfferService bp : nameNodeThreads) {
+        bp.bpThread = new Thread(bp, dnThreadName);
+        bp.bpThread.setDaemon(true); // needed for JUnit testing
+        bp.bpThread.start();
+      }
+    }
 
     // start dataXceiveServer
     dataXceiverServer.start();
     ipcServer.start();
-        
-    while (shouldRun) {
-      try {
-        startDistributedUpgradeIfNeeded();
-        offerService();
-      } catch (Exception ex) {
-        LOG.error("Exception: " + StringUtils.stringifyException(ex));
-        if (shouldRun) {
-          try {
-            Thread.sleep(5000);
-          } catch (InterruptedException ie) {
-          }
-        }
-      }
-    }
-        
-    LOG.info(dnRegistration + ":Finishing DataNode in: "+data);
-    shutdown();
-  }
-    
-  /** Start a single datanode daemon and wait for it to finish.
-   *  If this thread is specifically interrupted, it will stop waiting.
-   */
-  public static void runDatanodeDaemon(DataNode dn) throws IOException {
-    if (dn != null) {
-      //register datanode
-      dn.register();
-      dn.dataNodeThread = new Thread(dn, dnThreadName);
-      dn.dataNodeThread.setDaemon(true); // needed for JUnit testing
-      dn.dataNodeThread.start();
-    }
+
+    postStartInit(conf, dataDirs);
   }
-  
+
   static boolean isDatanodeUp(DataNode dn) {
-    return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
+    //return dn.dataNodeThread != null && dn.dataNodeThread.isAlive();
+    return true; // TODO:FEDERATION - put the right condition
   }
 
   /** Instantiate a single datanode object. This must be run by invoking
-   *  {@link DataNode#runDatanodeDaemon(DataNode)} subsequently. 
+   *  {@link DataNode#runDatanodeDaemon()} subsequently. 
    */
   public static DataNode instantiateDataNode(String args[],
                                       Configuration conf) throws IOException {
@@ -1477,7 +1704,7 @@ public class DataNode extends Configured
   }
   
   /** Instantiate a single datanode object, along with its secure resources. 
-   * This must be run by invoking{@link DataNode#runDatanodeDaemon(DataNode)} 
+   * This must be run by invoking{@link DataNode#runDatanodeDaemon()} 
    * subsequently. 
    */
   public static DataNode instantiateDataNode(String args [], Configuration conf,
@@ -1530,15 +1757,17 @@ public class DataNode extends Configured
   public static DataNode createDataNode(String args[], Configuration conf,
       SecureResources resources) throws IOException {
     DataNode dn = instantiateDataNode(args, conf, resources);
-    runDatanodeDaemon(dn);
+    dn.runDatanodeDaemon();
     return dn;
   }
 
   void join() {
-    if (dataNodeThread != null) {
-      try {
-        dataNodeThread.join();
-      } catch (InterruptedException e) {}
+    // TODO:FEDERATION do not ignore InterruptedException
+    for(BPOfferService bpos : nameNodeThreads) {
+      if(bpos.bpThread != null) 
+        try {
+          bpos.bpThread.join();
+        } catch (InterruptedException e) {}
     }
   }
 
@@ -1646,16 +1875,13 @@ public class DataNode extends Configured
   }
 
   /**
-   * This methods  arranges for the data node to send the block report at the next heartbeat.
+   * This methods  arranges for the data node to send 
+   * the block report at the next heartbeat.
    */
-  public void scheduleBlockReport(long delay) {
-    if (delay > 0) { // send BR after random delay
-      lastBlockReport = System.currentTimeMillis()
-                            - ( blockReportInterval - R.nextInt((int)(delay)));
-    } else { // send at next heartbeat
-      lastBlockReport = lastHeartbeat - blockReportInterval;
+  public void scheduleAllBlockReport(long delay) {
+    for(BPOfferService bpos : nameNodeThreads) {
+      bpos.scheduleBlockReport(delay);
     }
-    resetBlockReportTime = true; // reset future BRs for randomness
   }
   
   
@@ -2001,7 +2227,7 @@ public class DataNode extends Configured
   
   @Override // DataNodeMXBean
   public String getClusterId() {
-    return this.storage.clusterID;
-}
+    return clusterId;
+  }
 
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Feb 23 20:22:36 2011
@@ -87,6 +87,10 @@ public class DataStorage extends Storage
     storageID = "";
   }
   
+  public StorageInfo getBPStorage(String bpid) {
+    return bpStorageMap.get(bpid);
+  }
+  
   public DataStorage(StorageInfo storageInfo, String strgID) {
     super(NodeType.DATA_NODE, storageInfo);
     this.storageID = strgID;
@@ -118,7 +122,7 @@ public class DataStorage extends Storage
                              Collection<File> dataDirs,
                              StartupOption startOpt
                              ) throws IOException {
-    if (this.initilized) {
+    if (initilized) {
       // DN storage has been initialized, no need to do anything
       return;
     }
@@ -169,7 +173,7 @@ public class DataStorage extends Storage
 
     // 2. Do transitions
     // Each storage directory is treated individually.
-    // During sturtup some of them can upgrade or rollback 
+    // During startup some of them can upgrade or rollback 
     // while others could be uptodate for the regular startup.
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
       doTransition(getStorageDir(idx), nsInfo, startOpt);
@@ -179,6 +183,12 @@ public class DataStorage extends Storage
         "Data-node and name-node CTimes must be the same.";
     }
     
+    // make sure we have storage id set - if not - generate new one
+    if(storageID.isEmpty()) {
+      DataNode.setNewStorageID(DataNode.datanodeObject.dnRegistration);
+      storageID = DataNode.datanodeObject.dnRegistration.storageID;
+    }
+    
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
     
@@ -210,7 +220,8 @@ public class DataStorage extends Storage
     // mkdir for the list of BlockPoolStorage
     makeBlockPoolDataDir(bpDataDirs, null);
     BlockPoolStorage bpStorage = new BlockPoolStorage(nsInfo.getNamespaceID(), 
-        bpID, nsInfo.getCTime());
+        bpID, nsInfo.getCTime(), nsInfo.getClusterID());
+    
     bpStorage.recoverTransitionRead(nsInfo, bpDataDirs, startOpt);
     addBlockPoolStorage(bpID, bpStorage);
   }
@@ -447,7 +458,7 @@ public class DataStorage extends Storage
     // 3. Format BP and hard link blocks from previous directory
     File curBpDir = getBpRoot(nsInfo.getBlockPoolID(), curDir);
     BlockPoolStorage bpStorage = new BlockPoolStorage(nsInfo.getNamespaceID(), 
-        nsInfo.getBlockPoolID(), nsInfo.getCTime());
+        nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID());
     bpStorage.format(new StorageDirectory(curBpDir), nsInfo);
     linkAllBlocks(tmpDir, curBpDir);
     

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Feb 23 20:22:36 2011
@@ -881,6 +881,8 @@ public class FSDataset implements FSCons
     }
     volumes = new FSVolumeSet(volArray);
     volumes.getVolumeMap(volumeMap);
+
+    // TODO:FEDERATION this needs to be moved to addStorage()
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Wed Feb 23 20:22:36 2011
@@ -722,6 +722,7 @@ public class FSImage extends Storage {
     blockpoolID = bpid;
   }
   
+  @Override
   protected void getFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
@@ -774,6 +775,7 @@ public class FSImage extends Storage {
    * @param sd storage directory
    * @throws IOException
    */
+  @Override
   protected void setFields(Properties props, 
                            StorageDirectory sd 
                            ) throws IOException {
@@ -2338,7 +2340,7 @@ public class FSImage extends Storage {
     U_STR.write(out);
   }
 
-  String getBlockPoolID() {
+  public String getBlockPoolID() {
     return blockpoolID;
   }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Feb 23 20:22:36 2011
@@ -525,11 +525,12 @@ public class FSNamesystem implements FSC
   }
   
   NamespaceInfo getNamespaceInfo() {
-    return new NamespaceInfo(dir.fsImage.getNamespaceID(),
-                             dir.fsImage.getClusterID(),
-                             dir.fsImage.getBlockPoolID(),
+    NamespaceInfo nsinfo = new NamespaceInfo(dir.fsImage.getNamespaceID(),
+                             getClusterId(),
+                             getBlockpoolId(),
                              dir.fsImage.getCTime(),
                              getDistributedUpgradeVersion());
+    return nsinfo;
   }
 
   /**
@@ -2502,7 +2503,7 @@ public class FSNamesystem implements FSC
    * namespaceID and will continue serving the datanodes that has previously
    * registered with the namenode without restarting the whole cluster.
    * 
-   * @see org.apache.hadoop.hdfs.server.datanode.DataNode#register()
+   * @see org.apache.hadoop.hdfs.server.datanode.DataNode
    */
   public void registerDatanode(DatanodeRegistration nodeReg
                                             ) throws IOException {

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Feb 23 20:22:36 2011
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.AbstractList;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,9 +33,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
@@ -46,12 +48,12 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -217,6 +219,51 @@ public class NameNode implements Namenod
   public static InetSocketAddress getAddress(String address) {
     return NetUtils.createSocketAddr(address, DEFAULT_PORT);
   }
+  
+  /**
+   * TODO:FEDERATION
+   * at this moment only support fs.default style enteries.
+   * @param conf
+   * @return array of namenodes' addresses
+   */
+  public static InetSocketAddress [] getNNAddresses(Configuration conf) 
+  throws IOException {
+    URI[] nns=getNameNodesURIs(conf);
+    if(nns == null) {
+      throw new IOException("Federation namnodes are not configured correctly");
+    }
+
+    InetSocketAddress [] isas = new InetSocketAddress[nns.length];
+    int i=0;
+    for(URI u : nns) {
+      isas[i++] = getAddress(u); 
+    }
+    return isas;
+  }
+
+  /**
+   * TODO:FEDERATION
+   * get the list of namenodes from the configuration
+   * create URI for each one of them
+   * @param conf
+   * @return list of URIs of all configured NameNodes
+   */
+  public static URI [] getNameNodesURIs(Configuration conf) {
+    String [] nnURIs = conf.getStrings(DFSConfigKeys.DFS_FEDERATION_NAMENODES);
+    if(nnURIs == null) {
+      nnURIs = new String[] { conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)};
+    }
+
+    AbstractList<URI> nns = new ArrayList<URI>(nnURIs.length);
+    for(String uri : nnURIs) {
+      // name should be prepened with FileSystem.fixName(uri)  
+      // TBD
+      nns.add(URI.create(uri));
+    }
+
+    URI[] r = new URI[nns.size()];
+    return nns.toArray(r);
+  }
 
   /**
    * Set the configuration property for the service rpc address
@@ -246,6 +293,15 @@ public class NameNode implements Namenod
 
   public static InetSocketAddress getAddress(Configuration conf) {
     URI filesystemURI = FileSystem.getDefaultUri(conf);
+    return getAddress(filesystemURI);
+  }
+
+
+  /**
+   * TODO:FEDERATION
+   * @param filesystemURI
+   */
+  public static InetSocketAddress getAddress(URI filesystemURI) {
     String authority = filesystemURI.getAuthority();
     if (authority == null) {
       throw new IllegalArgumentException(String.format(

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Wed Feb 23 20:22:36 2011
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.BlockPoolStorage;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
@@ -77,9 +77,8 @@ implements Writable, NodeRegistration {
     this.ipcPort = ipcPort;
   }
 
-  public void setStorageInfo(DataStorage storage) {
+  public void setStorageInfo(StorageInfo storage) {
     this.storageInfo = new StorageInfo(storage);
-    this.storageID = storage.getStorageID();
   }
   
   public void setName(String name) {
@@ -108,6 +107,7 @@ implements Writable, NodeRegistration {
       + ", storageID=" + storageID
       + ", infoPort=" + infoPort
       + ", ipcPort=" + ipcPort
+      + ", storageInfo=" + storageInfo
       + ")";
   }
 

Modified: hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java Wed Feb 23 20:22:36 2011
@@ -94,4 +94,8 @@ public class NamespaceInfo extends Stora
     distributedUpgradeVersion = in.readInt();
     blockPoolID = WritableUtils.readString(in);
   }
+  
+  public String toString(){
+    return super.toString() + ";bpid=" + blockPoolID;
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1073927&r1=1073926&r2=1073927&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java Wed Feb 23 20:22:36 2011
@@ -629,7 +629,7 @@ public class MiniDFSCluster {
         StaticMapping.addNodeToRack(ipAddr + ":" + port,
                                   racks[i-curDatanodesNum]);
       }
-      DataNode.runDatanodeDaemon(dn);
+      dn.runDatanodeDaemon();
       dataNodes.add(new DataNodeProperties(dn, newconf, dnArgs));
     }
     curDatanodesNum += numDataNodes;
@@ -1137,7 +1137,7 @@ public class MiniDFSCluster {
     String bpid = getNamesystem().getPoolId();
     SimulatedFSDataset sdataset = (SimulatedFSDataset) dataSet;
     sdataset.injectBlocks(bpid, blocksToInject);
-    dataNodes.get(dataNodeIndex).datanode.scheduleBlockReport(0);
+    dataNodes.get(dataNodeIndex).datanode.scheduleAllBlockReport(0);
   }
   
   /**

Added: hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1073927&view=auto
==============================================================================
--- hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java (added)
+++ hadoop/hdfs/branches/HDFS-1052/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Wed Feb 23 20:22:36 2011
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestDataNodeMultipleRegistrations {
+  private static final Log LOG = 
+    LogFactory.getLog(TestDataNodeMultipleRegistrations.class);
+  File common_base_dir;
+  String localHost;
+  Configuration conf;
+
+  @Before
+  public void setUp() throws Exception {
+    common_base_dir = new File(MiniDFSCluster.getBaseDirectory());
+    if (common_base_dir != null) {
+      if (common_base_dir.exists() && !FileUtil.fullyDelete(common_base_dir)) {
+        throw new IOException("cannot get directory ready:"
+            + common_base_dir.getAbsolutePath());
+      }
+    }
+
+    conf = new HdfsConfiguration();
+    localHost = DNS.getDefaultHost(conf.get("dfs.datanode.dns.interface",
+        "default"), conf.get("dfs.datanode.dns.nameserver", "default"));
+
+    localHost = "127.0.0.1";
+    conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, localHost);
+  }
+
+  NameNode startNameNode(Configuration conf, int nnPort) throws IOException {
+    // per nn base_dir
+    File base_dir = new File(common_base_dir, Integer.toString(nnPort));
+
+    boolean manageNameDfsDirs = true; // for now
+    boolean format = true; // for now
+    // disable service authorization
+    conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
+        false);
+
+    // Setup the NameNode configuration
+    conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, localHost + ":0");
+    if (manageNameDfsDirs) {
+      String name = fileAsURI(new File(base_dir, "name1")) + ","
+          + fileAsURI(new File(base_dir, "name2"));
+      conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, name);
+      String sname = fileAsURI(new File(base_dir, "namesecondary1")) + ","
+          + fileAsURI(new File(base_dir, "namesecondary2"));
+      conf.set(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_DIR_KEY, sname);
+    }
+
+    // Format and clean out DataNode directories
+    if (format) {
+      GenericTestUtils.formatNamenode(conf);
+    }
+
+    // Start the NameNode
+    String[] args = new String[] {};
+    return NameNode.createNameNode(args, conf);
+  }
+
+  public DataNode startDataNode(Configuration conf) throws IOException {
+    Configuration dnConf = new HdfsConfiguration(conf);
+    boolean manageDfsDirs = true; // for now
+    File data_dir = new File(common_base_dir, "data");
+    if (data_dir.exists() && !FileUtil.fullyDelete(data_dir)) {
+      throw new IOException("Cannot remove data directory: " + data_dir);
+    }
+
+    if (manageDfsDirs) {
+      File dir1 = new File(data_dir, "data1");
+      File dir2 = new File(data_dir, "data2");
+      dir1.mkdirs();
+      dir2.mkdirs();
+      if (!dir1.isDirectory() || !dir2.isDirectory()) {
+        throw new IOException(
+            "Mkdirs failed to create directory for DataNode: " + dir1 + " or "
+                + dir2);
+      }
+      String dirs = fileAsURI(dir1) + "," + fileAsURI(dir2);
+      dnConf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+      conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dirs);
+    }
+    LOG.debug("Starting DataNode " + " with "
+        + DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY + ": "
+        + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+
+    String[] dnArgs = null; // for now
+    DataNode dn = DataNode.instantiateDataNode(dnArgs, dnConf);
+    if (dn == null)
+      throw new IOException("Cannot start DataNode in "
+          + dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
+
+    dn.runDatanodeDaemon();
+    return dn;
+  }
+
+  /**
+   * start multiple NNs and single DN and verifies per BP registrations and
+   * handshakes.
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void test2NNRegistration() throws IOException {
+    NameNode nn1, nn2;
+    // figure out host name for DataNode
+    int nnPort = 9928;
+    String nnURL1 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+    FileSystem.setDefaultUri(conf, nnURL1);
+    nn1 = startNameNode(conf, nnPort);
+    
+    nnPort = 9929;
+    String nnURL2 = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+    FileSystem.setDefaultUri(conf, nnURL2);
+    nn2 = startNameNode(conf, nnPort);
+    
+    Assert.assertNotNull("cannot create nn1", nn1);
+    Assert.assertNotNull("cannot create nn2", nn2);
+    
+    String bpid1 = nn1.getFSImage().getBlockPoolID();
+    String bpid2 = nn2.getFSImage().getBlockPoolID();
+    String cid1 = nn1.getFSImage().getClusterID();
+    String cid2 = nn2.getFSImage().getClusterID();
+    int lv1 = nn1.getFSImage().getLayoutVersion();
+    int lv2 = nn2.getFSImage().getLayoutVersion();
+    int ns1 = nn1.getFSImage().namespaceID;
+    int ns2 = nn2.getFSImage().namespaceID;
+    Assert.assertNotSame("namespace ids should be different", ns1, ns2);
+    LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
+        + ";uri=" + nn1.getNameNodeAddress());
+    LOG.info("nn2: lv=" + lv2 + ";cid=" + cid2 + ";bpid=" + bpid2
+        + ";uri=" + nn2.getNameNodeAddress());
+
+    // now start the datanode...
+    String nns = nnURL1 + "," + nnURL2;
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
+    DataNode dn = startDataNode(conf);
+    Assert.assertNotNull("failed to create DataNode", dn);
+    waitDataNodeUp(dn);
+
+    for (BPOfferService bpos : dn.nameNodeThreads) {
+      LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+    }
+    
+    BPOfferService bpos1 = dn.nameNodeThreads[0];
+    BPOfferService bpos2 = dn.nameNodeThreads[1];
+
+    Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+        .getNameNodeAddress());
+    Assert.assertEquals("wrong nn address", bpos2.nn_addr, nn2
+        .getNameNodeAddress());
+    Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+    Assert.assertEquals("wrong bpid", bpos2.getBlockPoolId(), bpid2);
+    Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
+    Assert.assertEquals("cid should be same", cid2, cid1);
+    Assert.assertEquals("namespace should be same", bpos1.bpNSInfo.namespaceID,
+        ns1);
+    Assert.assertEquals("namespace should be same", bpos2.bpNSInfo.namespaceID,
+        ns2);
+
+    dn.shutdown();
+    shutdownNN(nn1);
+    nn1 = null;
+    shutdownNN(nn2);
+    nn2 = null;
+  }
+
+  /**
+   * starts single nn and single dn and verifies registration and handshake
+   * 
+   * @throws IOException
+   */
+  @Test
+  public void testFedSingleNN() throws IOException {
+    NameNode nn1;
+    int nnPort = 9927;
+    // figure out host name for DataNode
+    String nnURL = "hdfs://" + localHost + ":" + Integer.toString(nnPort);
+
+    FileSystem.setDefaultUri(conf, nnURL);
+    nn1 = startNameNode(conf, nnPort);
+    Assert.assertNotNull("cannot create nn1", nn1);
+
+    String bpid1 = nn1.getFSImage().getBlockPoolID();
+    String cid1 = nn1.getFSImage().getClusterID();
+    int lv1 = nn1.getFSImage().getLayoutVersion();
+    LOG.info("nn1: lv=" + lv1 + ";cid=" + cid1 + ";bpid=" + bpid1
+        + ";uri=" + nn1.getNameNodeAddress());
+
+    // now start the datanode...
+    String nns = nnURL;
+    conf.set(DFSConfigKeys.DFS_FEDERATION_NAMENODES, nns);
+
+    DataNode dn = startDataNode(conf);
+    Assert.assertNotNull("failed to create DataNode", dn);
+
+    waitDataNodeUp(dn);
+    // try block report
+
+    for (BPOfferService bpos : dn.nameNodeThreads) {
+      LOG.info("reg: bpid=" + "; name=" + bpos.bpRegistration.name
+          + "; sid=" + bpos.bpRegistration.storageID + "; nna=" + bpos.nn_addr);
+    }
+    BPOfferService bpos1 = dn.nameNodeThreads[0];
+    bpos1.lastBlockReport = 0;
+    DatanodeCommand cmd = bpos1.blockReport();
+
+    Assert.assertNotNull("cmd is null", cmd);
+
+    Assert.assertEquals("wrong nn address", bpos1.nn_addr, nn1
+        .getNameNodeAddress());
+    Assert.assertEquals("wrong bpid", bpos1.getBlockPoolId(), bpid1);
+    Assert.assertEquals("wrong cid", dn.getClusterId(), cid1);
+
+    dn.shutdown();
+    dn = null;
+    shutdownNN(nn1);
+    nn1 = null;
+  }
+
+  private void shutdownNN(NameNode nn) {
+    if (nn == null) {
+      return;
+    }
+    nn.stop();
+    nn.join();
+  }
+
+  public boolean isDnUp(DataNode dn) {
+    boolean up = dn.nameNodeThreads.length > 0;
+    for (BPOfferService bpos : dn.nameNodeThreads) {
+      up = up && bpos.initialized();
+    }
+    return up;
+  }
+
+  public void waitDataNodeUp(DataNode dn) {
+    // should be something smart
+    while (!isDnUp(dn)) {
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+      }
+    }
+  }
+}