You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by to...@apache.org on 2011/11/21 20:27:13 UTC

svn commit: r1204660 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/server/common/ src/test/java/org/apache/hadoop/hdfs/server/data...

Author: todd
Date: Mon Nov 21 19:27:12 2011
New Revision: 1204660

URL: http://svn.apache.org/viewvc?rev=1204660&view=rev
Log:
HDFS-2566. Move BPOfferService to be a non-inner class. Contributed by Todd Lipcon.

Added:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Modified:
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
    hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Nov 21 19:27:12 2011
@@ -28,6 +28,8 @@ Release 0.23.1 - UNRELEASED
     HDFS-2570. Add descriptions for dfs.*.https.address in hdfs-default.xml.
     (eli)
 
+    HDFS-2566. Move BPOfferService to be a non-inner class. (todd)
+
   OPTIMIZATIONS
 
     HDFS-2130. Switch default checksum to CRC32C. (todd)

Added: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1204660&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (added)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Mon Nov 21 19:27:12 2011
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.StringUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * A thread per namenode to perform:
+ * <ul>
+ * <li> Pre-registration handshake with namenode</li>
+ * <li> Registration with namenode</li>
+ * <li> Send periodic heartbeats to the namenode</li>
+ * <li> Handle commands received from the namenode</li>
+ * </ul>
+ */
+@InterfaceAudience.Private
+class BPOfferService implements Runnable {
+  static final Log LOG = DataNode.LOG;
+
+  final InetSocketAddress nnAddr;
+  
+  /**
+   * Information about the namespace that this service
+   * is registering with. This is assigned after
+   * the first phase of the handshake.
+   */
+  NamespaceInfo bpNSInfo;
+
+  /**
+   * The registration information for this block pool.
+   * This is assigned after the second phase of the
+   * handshake.
+   */
+  DatanodeRegistration bpRegistration;
+  
+  long lastBlockReport = 0;
+
+  boolean resetBlockReportTime = true;
+
+  Thread bpThread;
+  DatanodeProtocol bpNamenode;
+  private long lastHeartbeat = 0;
+  private volatile boolean initialized = false;
+  private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
+  private final LinkedList<String> delHints = new LinkedList<String>();
+  private volatile boolean shouldServiceRun = true;
+  UpgradeManagerDatanode upgradeManager = null;
+  private final DataNode dn;
+  private final DNConf dnConf;
+
+  BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
+    this.dn = dn;
+    this.nnAddr = nnAddr;
+    this.dnConf = dn.getDnConf();
+  }
+
+  /**
+   * returns true if BP thread has completed initialization of storage
+   * and has registered with the corresponding namenode
+   * @return true if initialized
+   */
+  public boolean isInitialized() {
+    return initialized;
+  }
+  
+  public boolean isAlive() {
+    return shouldServiceRun && bpThread.isAlive();
+  }
+  
+  public String getBlockPoolId() {
+    if (bpNSInfo != null) {
+      return bpNSInfo.getBlockPoolID();
+    } else {
+      LOG.warn("Block pool ID needed, but service not yet registered with NN",
+          new Exception("trace"));
+      return null;
+    }
+  }
+  
+  public NamespaceInfo getNamespaceInfo() {
+    return bpNSInfo;
+  }
+  
+  public String toString() {
+    if (bpNSInfo == null) {
+      // If we haven't yet connected to our NN, we don't yet know our
+      // own block pool ID.
+      // If _none_ of the block pools have connected yet, we don't even
+      // know the storage ID of this DN.
+      String storageId = dn.getStorageId();
+      if (storageId == null || "".equals(storageId)) {
+        storageId = "unknown";
+      }
+      return "Block pool <registering> (storage id " + storageId +
+        ") connecting to " + nnAddr;
+    } else {
+      return "Block pool " + getBlockPoolId() +
+        " (storage id " + dn.getStorageId() +
+        ") registered with " + nnAddr;
+    }
+  }
+  
+  InetSocketAddress getNNSocketAddress() {
+    return nnAddr;
+  }
+
+  /**
+   * Used to inject a spy NN in the unit tests.
+   */
+  @VisibleForTesting
+  void setNameNode(DatanodeProtocol dnProtocol) {
+    bpNamenode = dnProtocol;
+  }
+
+  /**
+   * Perform the first part of the handshake with the NameNode.
+   * This calls <code>versionRequest</code> to determine the NN's
+   * namespace and version info. It automatically retries until
+   * the NN responds or the DN is shutting down.
+   * 
+   * @return the NamespaceInfo
+   * @throws IncorrectVersionException if the remote NN does not match
+   * this DN's version
+   */
+  NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
+    NamespaceInfo nsInfo = null;
+    while (shouldRun()) {
+      try {
+        nsInfo = bpNamenode.versionRequest();
+        LOG.debug(this + " received versionRequest response: " + nsInfo);
+        break;
+      } catch(SocketTimeoutException e) {  // namenode is busy
+        LOG.warn("Problem connecting to server: " + nnAddr);
+      } catch(IOException e ) {  // namenode is not available
+        LOG.warn("Problem connecting to server: " + nnAddr);
+      }
+      
+      // try again in a second
+      sleepAndLogInterrupts(5000, "requesting version info from NN");
+    }
+    
+    if (nsInfo != null) {
+      checkNNVersion(nsInfo);        
+    }
+    return nsInfo;
+  }
+
+  private void checkNNVersion(NamespaceInfo nsInfo)
+      throws IncorrectVersionException {
+    // build and layout versions should match
+    String nsBuildVer = nsInfo.getBuildVersion();
+    String stBuildVer = Storage.getBuildVersion();
+    if (!nsBuildVer.equals(stBuildVer)) {
+      LOG.warn("Data-node and name-node Build versions must be the same. " +
+        "Namenode build version: " + nsBuildVer + "Datanode " +
+        "build version: " + stBuildVer);
+      throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
+    }
+
+    if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
+      LOG.warn("Data-node and name-node layout versions must be the same." +
+        " Expected: "+ HdfsConstants.LAYOUT_VERSION +
+        " actual "+ bpNSInfo.getLayoutVersion());
+      throw new IncorrectVersionException(
+          bpNSInfo.getLayoutVersion(), "namenode");
+    }
+  }
+
+  private void connectToNNAndHandshake() throws IOException {
+    // get NN proxy
+    bpNamenode = (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
+          DatanodeProtocol.versionID, nnAddr, dn.getConf());
+
+    // First phase of the handshake with NN - get the namespace
+    // info.
+    bpNSInfo = retrieveNamespaceInfo();
+    
+    // Now that we know the namespace ID, etc, we can pass this to the DN.
+    // The DN can now initialize its local storage if we are the
+    // first BP to handshake, etc.
+    dn.initBlockPool(this);
+    
+    // Second phase of the handshake with the NN.
+    register();
+  }
+  
+  /**
+   * This methods  arranges for the data node to send the block report at 
+   * the next heartbeat.
+   */
+  void scheduleBlockReport(long delay) {
+    if (delay > 0) { // send BR after random delay
+      lastBlockReport = System.currentTimeMillis()
+      - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
+    } else { // send at next heartbeat
+      lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
+    }
+    resetBlockReportTime = true; // reset future BRs for randomness
+  }
+
+  void reportBadBlocks(ExtendedBlock block) {
+    DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
+    LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
+    
+    try {
+      bpNamenode.reportBadBlocks(blocks);  
+    } catch (IOException e){
+      /* One common reason is that NameNode could be in safe mode.
+       * Should we keep on retrying in that case?
+       */
+      LOG.warn("Failed to report bad block " + block + " to namenode : "
+          + " Exception", e);
+    }
+    
+  }
+  
+  /**
+   * Report received blocks and delete hints to the Namenode
+   * @throws IOException
+   */
+  private void reportReceivedBlocks() throws IOException {
+    //check if there are newly received blocks
+    Block [] blockArray=null;
+    String [] delHintArray=null;
+    synchronized(receivedBlockList) {
+      synchronized(delHints){
+        int numBlocks = receivedBlockList.size();
+        if (numBlocks > 0) {
+          if(numBlocks!=delHints.size()) {
+            LOG.warn("Panic: receiveBlockList and delHints are not of " +
+            "the same length" );
+          }
+          //
+          // Send newly-received blockids to namenode
+          //
+          blockArray = receivedBlockList.toArray(new Block[numBlocks]);
+          delHintArray = delHints.toArray(new String[numBlocks]);
+        }
+      }
+    }
+    if (blockArray != null) {
+      if(delHintArray == null || delHintArray.length != blockArray.length ) {
+        LOG.warn("Panic: block array & delHintArray are not the same" );
+      }
+      bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
+          delHintArray);
+      synchronized(receivedBlockList) {
+        synchronized(delHints){
+          for(int i=0; i<blockArray.length; i++) {
+            receivedBlockList.remove(blockArray[i]);
+            delHints.remove(delHintArray[i]);
+          }
+        }
+      }
+    }
+  }
+
+  /*
+   * Informing the name node could take a long long time! Should we wait
+   * till namenode is informed before responding with success to the
+   * client? For now we don't.
+   */
+  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+    if(block==null || delHint==null) {
+      throw new IllegalArgumentException(
+          block==null?"Block is null":"delHint is null");
+    }
+    
+    if (!block.getBlockPoolId().equals(getBlockPoolId())) {
+      LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+          + getBlockPoolId());
+      return;
+    }
+    
+    synchronized (receivedBlockList) {
+      synchronized (delHints) {
+        receivedBlockList.add(block.getLocalBlock());
+        delHints.add(delHint);
+        receivedBlockList.notifyAll();
+      }
+    }
+  }
+
+
+  /**
+   * Report the list blocks to the Namenode
+   * @throws IOException
+   */
+  DatanodeCommand blockReport() throws IOException {
+    // send block report if timer has expired.
+    DatanodeCommand cmd = null;
+    long startTime = now();
+    if (startTime - lastBlockReport > dnConf.blockReportInterval) {
+
+      // Create block report
+      long brCreateStartTime = now();
+      BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
+
+      // Send block report
+      long brSendStartTime = now();
+      cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
+          .getBlockListAsLongs());
+
+      // Log the block report processing stats from Datanode perspective
+      long brSendCost = now() - brSendStartTime;
+      long brCreateCost = brSendStartTime - brCreateStartTime;
+      dn.metrics.addBlockReport(brSendCost);
+      LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
+          + " blocks took " + brCreateCost + " msec to generate and "
+          + brSendCost + " msecs for RPC and NN processing");
+
+      // If we have sent the first block report, then wait a random
+      // time before we start the periodic block reports.
+      if (resetBlockReportTime) {
+        lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
+        resetBlockReportTime = false;
+      } else {
+        /* say the last block report was at 8:20:14. The current report
+         * should have started around 9:20:14 (default 1 hour interval).
+         * If current time is :
+         *   1) normal like 9:20:18, next report should be at 10:20:14
+         *   2) unexpected like 11:35:43, next report should be at 12:20:14
+         */
+        lastBlockReport += (now() - lastBlockReport) /
+        dnConf.blockReportInterval * dnConf.blockReportInterval;
+      }
+      LOG.info("sent block report, processed command:" + cmd);
+    }
+    return cmd;
+  }
+  
+  
+  DatanodeCommand [] sendHeartBeat() throws IOException {
+    return bpNamenode.sendHeartbeat(bpRegistration,
+        dn.data.getCapacity(),
+        dn.data.getDfsUsed(),
+        dn.data.getRemaining(),
+        dn.data.getBlockPoolUsed(getBlockPoolId()),
+        dn.xmitsInProgress.get(),
+        dn.getXceiverCount(), dn.data.getNumFailedVolumes());
+  }
+  
+  //This must be called only by blockPoolManager
+  void start() {
+    if ((bpThread != null) && (bpThread.isAlive())) {
+      //Thread is started already
+      return;
+    }
+    bpThread = new Thread(this, formatThreadName());
+    bpThread.setDaemon(true); // needed for JUnit testing
+    bpThread.start();
+  }
+  
+  private String formatThreadName() {
+    Collection<URI> dataDirs = DataNode.getStorageDirs(dn.getConf());
+    return "DataNode: [" +
+      StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "] " +
+      " heartbeating to " + nnAddr;
+  }
+
+  //This must be called only by blockPoolManager.
+  void stop() {
+    shouldServiceRun = false;
+    if (bpThread != null) {
+        bpThread.interrupt();
+    }
+  }
+  
+  //This must be called only by blockPoolManager
+  void join() {
+    try {
+      if (bpThread != null) {
+        bpThread.join();
+      }
+    } catch (InterruptedException ie) { }
+  }
+  
+  //Cleanup method to be called by current thread before exiting.
+  private synchronized void cleanUp() {
+    
+    if(upgradeManager != null)
+      upgradeManager.shutdownUpgrade();
+    shouldServiceRun = false;
+    RPC.stopProxy(bpNamenode);
+    dn.shutdownBlockPool(this);
+  }
+
+  /**
+   * Main loop for each BP thread. Run until shutdown,
+   * forever calling remote NameNode functions.
+   */
+  private void offerService() throws Exception {
+    LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+        + dnConf.blockReportInterval + "msec" + " Initial delay: "
+        + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
+        + dnConf.heartBeatInterval);
+
+    //
+    // Now loop for a long time....
+    //
+    while (shouldRun()) {
+      try {
+        long startTime = now();
+
+        //
+        // Every so often, send heartbeat or block-report
+        //
+        if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
+          //
+          // All heartbeat messages include following info:
+          // -- Datanode name
+          // -- data transfer port
+          // -- Total capacity
+          // -- Bytes remaining
+          //
+          lastHeartbeat = startTime;
+          if (!dn.areHeartbeatsDisabledForTests()) {
+            DatanodeCommand[] cmds = sendHeartBeat();
+            dn.metrics.addHeartbeat(now() - startTime);
+
+            long startProcessCommands = now();
+            if (!processCommand(cmds))
+              continue;
+            long endProcessCommands = now();
+            if (endProcessCommands - startProcessCommands > 2000) {
+              LOG.info("Took " + (endProcessCommands - startProcessCommands) +
+                  "ms to process " + cmds.length + " commands from NN");
+            }
+          }
+        }
+
+        reportReceivedBlocks();
+
+        DatanodeCommand cmd = blockReport();
+        processCommand(cmd);
+
+        // Now safe to start scanning the block pool
+        if (dn.blockScanner != null) {
+          dn.blockScanner.addBlockPool(this.getBlockPoolId());
+        }
+
+        //
+        // There is no work to do;  sleep until hearbeat timer elapses, 
+        // or work arrives, and then iterate again.
+        //
+        long waitTime = dnConf.heartBeatInterval - 
+        (System.currentTimeMillis() - lastHeartbeat);
+        synchronized(receivedBlockList) {
+          if (waitTime > 0 && receivedBlockList.size() == 0) {
+            try {
+              receivedBlockList.wait(waitTime);
+            } catch (InterruptedException ie) {
+              LOG.warn("BPOfferService for " + this + " interrupted");
+            }
+          }
+        } // synchronized
+      } catch(RemoteException re) {
+        String reClass = re.getClassName();
+        if (UnregisteredNodeException.class.getName().equals(reClass) ||
+            DisallowedDatanodeException.class.getName().equals(reClass) ||
+            IncorrectVersionException.class.getName().equals(reClass)) {
+          LOG.warn(this + " is shutting down", re);
+          shouldServiceRun = false;
+          return;
+        }
+        LOG.warn("RemoteException in offerService", re);
+        try {
+          long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
+          Thread.sleep(sleepTime);
+        } catch (InterruptedException ie) {
+          Thread.currentThread().interrupt();
+        }
+      } catch (IOException e) {
+        LOG.warn("IOException in offerService", e);
+      }
+    } // while (shouldRun())
+  } // offerService
+
+  /**
+   * Register one bp with the corresponding NameNode
+   * <p>
+   * The bpDatanode needs to register with the namenode on startup in order
+   * 1) to report which storage it is serving now and 
+   * 2) to receive a registrationID
+   *  
+   * issued by the namenode to recognize registered datanodes.
+   * 
+   * @see FSNamesystem#registerDatanode(DatanodeRegistration)
+   * @throws IOException
+   */
+  void register() throws IOException {
+    Preconditions.checkState(bpNSInfo != null,
+        "register() should be called after handshake()");
+    
+    // The handshake() phase loaded the block pool storage
+    // off disk - so update the bpRegistration object from that info
+    bpRegistration = dn.createBPRegistration(bpNSInfo);
+
+    LOG.info(this + " beginning handshake with NN");
+
+    while (shouldRun()) {
+      try {
+        // Use returned registration from namenode with updated machine name.
+        bpRegistration = bpNamenode.registerDatanode(bpRegistration);
+        break;
+      } catch(SocketTimeoutException e) {  // namenode is busy
+        LOG.info("Problem connecting to server: " + nnAddr);
+        sleepAndLogInterrupts(1000, "connecting to server");
+      }
+    }
+    
+    LOG.info("Block pool " + this + " successfully registered with NN");
+    dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
+
+    // random short delay - helps scatter the BR from all DNs
+    scheduleBlockReport(dnConf.initialBlockReportDelay);
+  }
+
+
+  private void sleepAndLogInterrupts(int millis,
+      String stateString) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException ie) {
+      LOG.info("BPOfferService " + this +
+          " interrupted while " + stateString);
+    }
+  }
+
+  /**
+   * No matter what kind of exception we get, keep retrying to offerService().
+   * That's the loop that connects to the NameNode and provides basic DataNode
+   * functionality.
+   *
+   * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
+   * happen either at shutdown or due to refreshNamenodes.
+   */
+  @Override
+  public void run() {
+    LOG.info(this + " starting to offer service");
+
+    try {
+      // init stuff
+      try {
+        // setup storage
+        connectToNNAndHandshake();
+      } catch (IOException ioe) {
+        // Initial handshake, storage recovery or registration failed
+        // End BPOfferService thread
+        LOG.fatal("Initialization failed for block pool " + this, ioe);
+        return;
+      }
+
+      initialized = true; // bp is initialized;
+      
+      while (shouldRun()) {
+        try {
+          startDistributedUpgradeIfNeeded();
+          offerService();
+        } catch (Exception ex) {
+          LOG.error("Exception in BPOfferService for " + this, ex);
+          sleepAndLogInterrupts(5000, "offering service");
+        }
+      }
+    } catch (Throwable ex) {
+      LOG.warn("Unexpected exception in block pool " + this, ex);
+    } finally {
+      LOG.warn("Ending block pool service for: " + this);
+      cleanUp();
+    }
+  }
+
+  private boolean shouldRun() {
+    return shouldServiceRun && dn.shouldRun();
+  }
+
+  /**
+   * Process an array of datanode commands
+   * 
+   * @param cmds an array of datanode commands
+   * @return true if further processing may be required or false otherwise. 
+   */
+  private boolean processCommand(DatanodeCommand[] cmds) {
+    if (cmds != null) {
+      for (DatanodeCommand cmd : cmds) {
+        try {
+          if (processCommand(cmd) == false) {
+            return false;
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Error processing datanode Command", ioe);
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * 
+   * @param cmd
+   * @return true if further processing may be required or false otherwise. 
+   * @throws IOException
+   */
+  private boolean processCommand(DatanodeCommand cmd) throws IOException {
+    if (cmd == null)
+      return true;
+    final BlockCommand bcmd = 
+      cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+
+    switch(cmd.getAction()) {
+    case DatanodeProtocol.DNA_TRANSFER:
+      // Send a copy of a block to another datanode
+      dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
+      dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
+      break;
+    case DatanodeProtocol.DNA_INVALIDATE:
+      //
+      // Some local block(s) are obsolete and can be 
+      // safely garbage-collected.
+      //
+      Block toDelete[] = bcmd.getBlocks();
+      try {
+        if (dn.blockScanner != null) {
+          dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
+        }
+        // using global fsdataset
+        dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
+      } catch(IOException e) {
+        dn.checkDiskError();
+        throw e;
+      }
+      dn.metrics.incrBlocksRemoved(toDelete.length);
+      break;
+    case DatanodeProtocol.DNA_SHUTDOWN:
+      // shut down the data node
+      shouldServiceRun = false;
+      return false;
+    case DatanodeProtocol.DNA_REGISTER:
+      // namenode requested a registration - at start or if NN lost contact
+      LOG.info("DatanodeCommand action: DNA_REGISTER");
+      if (shouldRun()) {
+        // re-retrieve namespace info to make sure that, if the NN
+        // was restarted, we still match its version (HDFS-2120)
+        retrieveNamespaceInfo();
+        // and re-register
+        register();
+      }
+      break;
+    case DatanodeProtocol.DNA_FINALIZE:
+      String bp = ((DatanodeCommand.Finalize) cmd).getBlockPoolId(); 
+      assert getBlockPoolId().equals(bp) :
+        "BP " + getBlockPoolId() + " received DNA_FINALIZE " +
+        "for other block pool " + bp;
+
+      dn.finalizeUpgradeForPool(bp);
+      break;
+    case UpgradeCommand.UC_ACTION_START_UPGRADE:
+      // start distributed upgrade here
+      processDistributedUpgradeCommand((UpgradeCommand)cmd);
+      break;
+    case DatanodeProtocol.DNA_RECOVERBLOCK:
+      dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
+      break;
+    case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
+      LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
+      if (dn.isBlockTokenEnabled) {
+        dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(), 
+            ((KeyUpdateCommand) cmd).getExportedKeys());
+      }
+      break;
+    case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+      LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
+      long bandwidth =
+                 ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
+      if (bandwidth > 0) {
+        DataXceiverServer dxcs =
+                     (DataXceiverServer) dn.dataXceiverServer.getRunnable();
+        dxcs.balanceThrottler.setBandwidth(bandwidth);
+      }
+      break;
+    default:
+      LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
+    }
+    return true;
+  }
+  
+  private void processDistributedUpgradeCommand(UpgradeCommand comm)
+  throws IOException {
+    UpgradeManagerDatanode upgradeManager = getUpgradeManager();
+    upgradeManager.processUpgradeCommand(comm);
+  }
+
+  synchronized UpgradeManagerDatanode getUpgradeManager() {
+    if(upgradeManager == null)
+      upgradeManager = 
+        new UpgradeManagerDatanode(dn, getBlockPoolId());
+    
+    return upgradeManager;
+  }
+  
+  /**
+   * Start distributed upgrade if it should be initiated by the data-node.
+   */
+  private void startDistributedUpgradeIfNeeded() throws IOException {
+    UpgradeManagerDatanode um = getUpgradeManager();
+    
+    if(!um.getUpgradeState())
+      return;
+    um.setUpgradeState(false, um.getUpgradeVersion());
+    um.startUpgrade();
+    return;
+  }
+
+}

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Mon Nov 21 19:27:12 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Nov 21 19:27:12 2011
@@ -50,7 +50,6 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEBHDFS_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -61,7 +60,6 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
-import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.channels.ServerSocketChannel;
@@ -74,7 +72,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -94,7 +91,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -103,7 +99,6 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@@ -117,31 +112,21 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
-import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
@@ -170,7 +155,6 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 
@@ -393,8 +377,6 @@ public class DataNode extends Configured
   
   private volatile String hostName; // Host name of this datanode
   
-  private static String dnThreadName;
-
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   
@@ -666,701 +648,8 @@ public class DataNode extends Configured
     this.heartbeatsDisabledForTests = heartbeatsDisabledForTests;
   }
   
-  /**
-   * A thread per namenode to perform:
-   * <ul>
-   * <li> Pre-registration handshake with namenode</li>
-   * <li> Registration with namenode</li>
-   * <li> Send periodic heartbeats to the namenode</li>
-   * <li> Handle commands received from the datanode</li>
-   * </ul>
-   */
-  @InterfaceAudience.Private
-  static class BPOfferService implements Runnable {
-    final InetSocketAddress nnAddr;
-    
-    /**
-     * Information about the namespace that this service
-     * is registering with. This is assigned after
-     * the first phase of the handshake.
-     */
-    NamespaceInfo bpNSInfo;
-
-    /**
-     * The registration information for this block pool.
-     * This is assigned after the second phase of the
-     * handshake.
-     */
-    DatanodeRegistration bpRegistration;
-    
-    long lastBlockReport = 0;
-
-    boolean resetBlockReportTime = true;
-
-    private Thread bpThread;
-    private DatanodeProtocol bpNamenode;
-    private long lastHeartbeat = 0;
-    private volatile boolean initialized = false;
-    private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-    private final LinkedList<String> delHints = new LinkedList<String>();
-    private volatile boolean shouldServiceRun = true;
-    UpgradeManagerDatanode upgradeManager = null;
-    private final DataNode dn;
-    private final DNConf dnConf;
-
-    BPOfferService(InetSocketAddress nnAddr, DataNode dn) {
-      this.dn = dn;
-      this.nnAddr = nnAddr;
-      this.dnConf = dn.getDnConf();
-    }
-
-    /**
-     * returns true if BP thread has completed initialization of storage
-     * and has registered with the corresponding namenode
-     * @return true if initialized
-     */
-    public boolean isInitialized() {
-      return initialized;
-    }
-    
-    public boolean isAlive() {
-      return shouldServiceRun && bpThread.isAlive();
-    }
-    
-    public String getBlockPoolId() {
-      if (bpNSInfo != null) {
-        return bpNSInfo.getBlockPoolID();
-      } else {
-        LOG.warn("Block pool ID needed, but service not yet registered with NN",
-            new Exception("trace"));
-        return null;
-      }
-    }
-    
-    public NamespaceInfo getNamespaceInfo() {
-      return bpNSInfo;
-    }
-    
-    public String toString() {
-      if (bpNSInfo == null) {
-        // If we haven't yet connected to our NN, we don't yet know our
-        // own block pool ID.
-        // If _none_ of the block pools have connected yet, we don't even
-        // know the storage ID of this DN.
-        String storageId = dn.getStorageId();
-        if (storageId == null || "".equals(storageId)) {
-          storageId = "unknown";
-        }
-        return "Block pool <registering> (storage id " + storageId +
-          ") connecting to " + nnAddr;
-      } else {
-        return "Block pool " + getBlockPoolId() +
-          " (storage id " + dn.getStorageId() +
-          ") registered with " + nnAddr;
-      }
-    }
-    
-    private InetSocketAddress getNNSocketAddress() {
-      return nnAddr;
-    }
- 
-    /**
-     * Used to inject a spy NN in the unit tests.
-     */
-    @VisibleForTesting
-    void setNameNode(DatanodeProtocol dnProtocol) {
-      bpNamenode = dnProtocol;
-    }
-
-    /**
-     * Perform the first part of the handshake with the NameNode.
-     * This calls <code>versionRequest</code> to determine the NN's
-     * namespace and version info. It automatically retries until
-     * the NN responds or the DN is shutting down.
-     * 
-     * @return the NamespaceInfo
-     * @throws IncorrectVersionException if the remote NN does not match
-     * this DN's version
-     */
-    NamespaceInfo retrieveNamespaceInfo() throws IncorrectVersionException {
-      NamespaceInfo nsInfo = null;
-      while (shouldRun()) {
-        try {
-          nsInfo = bpNamenode.versionRequest();
-          LOG.debug(this + " received versionRequest response: " + nsInfo);
-          break;
-        } catch(SocketTimeoutException e) {  // namenode is busy
-          LOG.warn("Problem connecting to server: " + nnAddr);
-        } catch(IOException e ) {  // namenode is not available
-          LOG.warn("Problem connecting to server: " + nnAddr);
-        }
-        
-        // try again in a second
-        sleepAndLogInterrupts(5000, "requesting version info from NN");
-      }
-      
-      if (nsInfo != null) {
-        checkNNVersion(nsInfo);        
-      }
-      return nsInfo;
-    }
-
-    private void checkNNVersion(NamespaceInfo nsInfo)
-        throws IncorrectVersionException {
-      // build and layout versions should match
-      String nsBuildVer = nsInfo.getBuildVersion();
-      String stBuildVer = Storage.getBuildVersion();
-      if (!nsBuildVer.equals(stBuildVer)) {
-        LOG.warn("Data-node and name-node Build versions must be the same. " +
-          "Namenode build version: " + nsBuildVer + "Datanode " +
-          "build version: " + stBuildVer);
-        throw new IncorrectVersionException(nsBuildVer, "namenode", stBuildVer);
-      }
-
-      if (HdfsConstants.LAYOUT_VERSION != nsInfo.getLayoutVersion()) {
-        LOG.warn("Data-node and name-node layout versions must be the same." +
-          " Expected: "+ HdfsConstants.LAYOUT_VERSION +
-          " actual "+ bpNSInfo.getLayoutVersion());
-        throw new IncorrectVersionException(
-            bpNSInfo.getLayoutVersion(), "namenode");
-      }
-    }
-
-    private void connectToNNAndHandshake() throws IOException {
-      // get NN proxy
-      bpNamenode = 
-        (DatanodeProtocol)RPC.waitForProxy(DatanodeProtocol.class,
-            DatanodeProtocol.versionID, nnAddr, dn.getConf());
-
-      // First phase of the handshake with NN - get the namespace
-      // info.
-      bpNSInfo = retrieveNamespaceInfo();
-      
-      // Now that we know the namespace ID, etc, we can pass this to the DN.
-      // The DN can now initialize its local storage if we are the
-      // first BP to handshake, etc.
-      dn.initBlockPool(this);
-      
-      // Second phase of the handshake with the NN.
-      register();
-    }
-    
-    /**
-     * This methods  arranges for the data node to send the block report at 
-     * the next heartbeat.
-     */
-    void scheduleBlockReport(long delay) {
-      if (delay > 0) { // send BR after random delay
-        lastBlockReport = System.currentTimeMillis()
-        - ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
-      } else { // send at next heartbeat
-        lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
-      }
-      resetBlockReportTime = true; // reset future BRs for randomness
-    }
-
-    private void reportBadBlocks(ExtendedBlock block) {
-      DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
-      LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; 
-      
-      try {
-        bpNamenode.reportBadBlocks(blocks);  
-      } catch (IOException e){
-        /* One common reason is that NameNode could be in safe mode.
-         * Should we keep on retrying in that case?
-         */
-        LOG.warn("Failed to report bad block " + block + " to namenode : "
-            + " Exception", e);
-      }
-      
-    }
-    
-    /**
-     * Report received blocks and delete hints to the Namenode
-     * @throws IOException
-     */
-    private void reportReceivedBlocks() throws IOException {
-      //check if there are newly received blocks
-      Block [] blockArray=null;
-      String [] delHintArray=null;
-      synchronized(receivedBlockList) {
-        synchronized(delHints){
-          int numBlocks = receivedBlockList.size();
-          if (numBlocks > 0) {
-            if(numBlocks!=delHints.size()) {
-              LOG.warn("Panic: receiveBlockList and delHints are not of " +
-              "the same length" );
-            }
-            //
-            // Send newly-received blockids to namenode
-            //
-            blockArray = receivedBlockList.toArray(new Block[numBlocks]);
-            delHintArray = delHints.toArray(new String[numBlocks]);
-          }
-        }
-      }
-      if (blockArray != null) {
-        if(delHintArray == null || delHintArray.length != blockArray.length ) {
-          LOG.warn("Panic: block array & delHintArray are not the same" );
-        }
-        bpNamenode.blockReceived(bpRegistration, getBlockPoolId(), blockArray,
-            delHintArray);
-        synchronized(receivedBlockList) {
-          synchronized(delHints){
-            for(int i=0; i<blockArray.length; i++) {
-              receivedBlockList.remove(blockArray[i]);
-              delHints.remove(delHintArray[i]);
-            }
-          }
-        }
-      }
-    }
-
-    /*
-     * Informing the name node could take a long long time! Should we wait
-     * till namenode is informed before responding with success to the
-     * client? For now we don't.
-     */
-    void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-      if(block==null || delHint==null) {
-        throw new IllegalArgumentException(
-            block==null?"Block is null":"delHint is null");
-      }
-      
-      if (!block.getBlockPoolId().equals(getBlockPoolId())) {
-        LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + 
-            " vs. " + getBlockPoolId());
-        return;
-      }
-      
-      synchronized (receivedBlockList) {
-        synchronized (delHints) {
-          receivedBlockList.add(block.getLocalBlock());
-          delHints.add(delHint);
-          receivedBlockList.notifyAll();
-        }
-      }
-    }
-
-
-    /**
-     * Report the list blocks to the Namenode
-     * @throws IOException
-     */
-    DatanodeCommand blockReport() throws IOException {
-      // send block report if timer has expired.
-      DatanodeCommand cmd = null;
-      long startTime = now();
-      if (startTime - lastBlockReport > dnConf.blockReportInterval) {
-
-        // Create block report
-        long brCreateStartTime = now();
-        BlockListAsLongs bReport = dn.data.getBlockReport(getBlockPoolId());
-
-        // Send block report
-        long brSendStartTime = now();
-        cmd = bpNamenode.blockReport(bpRegistration, getBlockPoolId(), bReport
-            .getBlockListAsLongs());
-
-        // Log the block report processing stats from Datanode perspective
-        long brSendCost = now() - brSendStartTime;
-        long brCreateCost = brSendStartTime - brCreateStartTime;
-        dn.metrics.addBlockReport(brSendCost);
-        LOG.info("BlockReport of " + bReport.getNumberOfBlocks()
-            + " blocks took " + brCreateCost + " msec to generate and "
-            + brSendCost + " msecs for RPC and NN processing");
-
-        // If we have sent the first block report, then wait a random
-        // time before we start the periodic block reports.
-        if (resetBlockReportTime) {
-          lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(dnConf.blockReportInterval));
-          resetBlockReportTime = false;
-        } else {
-          /* say the last block report was at 8:20:14. The current report
-           * should have started around 9:20:14 (default 1 hour interval).
-           * If current time is :
-           *   1) normal like 9:20:18, next report should be at 10:20:14
-           *   2) unexpected like 11:35:43, next report should be at 12:20:14
-           */
-          lastBlockReport += (now() - lastBlockReport) /
-          dnConf.blockReportInterval * dnConf.blockReportInterval;
-        }
-        LOG.info("sent block report, processed command:" + cmd);
-      }
-      return cmd;
-    }
-    
-    
-    DatanodeCommand [] sendHeartBeat() throws IOException {
-      return bpNamenode.sendHeartbeat(bpRegistration,
-          dn.data.getCapacity(),
-          dn.data.getDfsUsed(),
-          dn.data.getRemaining(),
-          dn.data.getBlockPoolUsed(getBlockPoolId()),
-          dn.xmitsInProgress.get(),
-          dn.getXceiverCount(), dn.data.getNumFailedVolumes());
-    }
-    
-    //This must be called only by blockPoolManager
-    void start() {
-      if ((bpThread != null) && (bpThread.isAlive())) {
-        //Thread is started already
-        return;
-      }
-      bpThread = new Thread(this, dnThreadName);
-      bpThread.setDaemon(true); // needed for JUnit testing
-      bpThread.start();
-    }
-    
-    //This must be called only by blockPoolManager.
-    void stop() {
-      shouldServiceRun = false;
-      if (bpThread != null) {
-          bpThread.interrupt();
-      }
-    }
-    
-    //This must be called only by blockPoolManager
-    void join() {
-      try {
-        if (bpThread != null) {
-          bpThread.join();
-        }
-      } catch (InterruptedException ie) { }
-    }
-    
-    //Cleanup method to be called by current thread before exiting.
-    private synchronized void cleanUp() {
-      
-      if(upgradeManager != null)
-        upgradeManager.shutdownUpgrade();
-      shouldServiceRun = false;
-      RPC.stopProxy(bpNamenode);
-      dn.shutdownBlockPool(this);
-    }
-
-    /**
-     * Main loop for each BP thread. Run until shutdown,
-     * forever calling remote NameNode functions.
-     */
-    private void offerService() throws Exception {
-      LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
-          + dnConf.blockReportInterval + "msec" + " Initial delay: "
-          + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
-          + dnConf.heartBeatInterval);
-
-      //
-      // Now loop for a long time....
-      //
-      while (shouldRun()) {
-        try {
-          long startTime = now();
-
-          //
-          // Every so often, send heartbeat or block-report
-          //
-          if (startTime - lastHeartbeat > dnConf.heartBeatInterval) {
-            //
-            // All heartbeat messages include following info:
-            // -- Datanode name
-            // -- data transfer port
-            // -- Total capacity
-            // -- Bytes remaining
-            //
-            lastHeartbeat = startTime;
-            if (!dn.heartbeatsDisabledForTests) {
-              DatanodeCommand[] cmds = sendHeartBeat();
-              dn.metrics.addHeartbeat(now() - startTime);
-
-              long startProcessCommands = now();
-              if (!processCommand(cmds))
-                continue;
-              long endProcessCommands = now();
-              if (endProcessCommands - startProcessCommands > 2000) {
-                LOG.info("Took " + (endProcessCommands - startProcessCommands) +
-                    "ms to process " + cmds.length + " commands from NN");
-              }
-            }
-          }
-
-          reportReceivedBlocks();
-
-          DatanodeCommand cmd = blockReport();
-          processCommand(cmd);
-
-          // Now safe to start scanning the block pool
-          if (dn.blockScanner != null) {
-            dn.blockScanner.addBlockPool(this.getBlockPoolId());
-          }
-
-          //
-          // There is no work to do;  sleep until hearbeat timer elapses, 
-          // or work arrives, and then iterate again.
-          //
-          long waitTime = dnConf.heartBeatInterval - 
-          (System.currentTimeMillis() - lastHeartbeat);
-          synchronized(receivedBlockList) {
-            if (waitTime > 0 && receivedBlockList.size() == 0) {
-              try {
-                receivedBlockList.wait(waitTime);
-              } catch (InterruptedException ie) {
-                LOG.warn("BPOfferService for " + this + " interrupted");
-              }
-            }
-          } // synchronized
-        } catch(RemoteException re) {
-          String reClass = re.getClassName();
-          if (UnregisteredNodeException.class.getName().equals(reClass) ||
-              DisallowedDatanodeException.class.getName().equals(reClass) ||
-              IncorrectVersionException.class.getName().equals(reClass)) {
-            LOG.warn(this + " is shutting down", re);
-            shouldServiceRun = false;
-            return;
-          }
-          LOG.warn("RemoteException in offerService", re);
-          try {
-            long sleepTime = Math.min(1000, dnConf.heartBeatInterval);
-            Thread.sleep(sleepTime);
-          } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-          }
-        } catch (IOException e) {
-          LOG.warn("IOException in offerService", e);
-        }
-      } // while (shouldRun())
-    } // offerService
-
-    /**
-     * Register one bp with the corresponding NameNode
-     * <p>
-     * The bpDatanode needs to register with the namenode on startup in order
-     * 1) to report which storage it is serving now and 
-     * 2) to receive a registrationID
-     *  
-     * issued by the namenode to recognize registered datanodes.
-     * 
-     * @see FSNamesystem#registerDatanode(DatanodeRegistration)
-     * @throws IOException
-     */
-    void register() throws IOException {
-      Preconditions.checkState(bpNSInfo != null,
-          "register() should be called after handshake()");
-      
-      // The handshake() phase loaded the block pool storage
-      // off disk - so update the bpRegistration object from that info
-      bpRegistration = dn.createBPRegistration(bpNSInfo);
-
-      LOG.info(this + " beginning handshake with NN");
-
-      while (shouldRun()) {
-        try {
-          // Use returned registration from namenode with updated machine name.
-          bpRegistration = bpNamenode.registerDatanode(bpRegistration);
-          break;
-        } catch(SocketTimeoutException e) {  // namenode is busy
-          LOG.info("Problem connecting to server: " + nnAddr);
-          sleepAndLogInterrupts(1000, "connecting to server");
-        }
-      }
-      
-      LOG.info("Block pool " + this + " successfully registered with NN");
-      dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId());
-
-      // random short delay - helps scatter the BR from all DNs
-      scheduleBlockReport(dnConf.initialBlockReportDelay);
-    }
-
-
-    private void sleepAndLogInterrupts(int millis,
-        String stateString) {
-      try {
-        Thread.sleep(millis);
-      } catch (InterruptedException ie) {
-        LOG.info("BPOfferService " + this +
-            " interrupted while " + stateString);
-      }
-    }
-
-    /**
-     * No matter what kind of exception we get, keep retrying to offerService().
-     * That's the loop that connects to the NameNode and provides basic DataNode
-     * functionality.
-     *
-     * Only stop when "shouldRun" or "shouldServiceRun" is turned off, which can
-     * happen either at shutdown or due to refreshNamenodes.
-     */
-    @Override
-    public void run() {
-      LOG.info(this + " starting to offer service");
-
-      try {
-        // init stuff
-        try {
-          // setup storage
-          connectToNNAndHandshake();
-        } catch (IOException ioe) {
-          // Initial handshake, storage recovery or registration failed
-          // End BPOfferService thread
-          LOG.fatal("Initialization failed for block pool " + this, ioe);
-          return;
-        }
-
-        initialized = true; // bp is initialized;
-        
-        while (shouldRun()) {
-          try {
-            startDistributedUpgradeIfNeeded();
-            offerService();
-          } catch (Exception ex) {
-            LOG.error("Exception in BPOfferService for " + this, ex);
-            sleepAndLogInterrupts(5000, "offering service");
-          }
-        }
-      } catch (Throwable ex) {
-        LOG.warn("Unexpected exception in block pool " + this, ex);
-      } finally {
-        LOG.warn("Ending block pool service for: " + this);
-        cleanUp();
-      }
-    }
-
-    private boolean shouldRun() {
-      return shouldServiceRun && dn.shouldRun();
-    }
-
-    /**
-     * Process an array of datanode commands
-     * 
-     * @param cmds an array of datanode commands
-     * @return true if further processing may be required or false otherwise. 
-     */
-    private boolean processCommand(DatanodeCommand[] cmds) {
-      if (cmds != null) {
-        for (DatanodeCommand cmd : cmds) {
-          try {
-            if (processCommand(cmd) == false) {
-              return false;
-            }
-          } catch (IOException ioe) {
-            LOG.warn("Error processing datanode Command", ioe);
-          }
-        }
-      }
-      return true;
-    }
-
-    /**
-     * 
-     * @param cmd
-     * @return true if further processing may be required or false otherwise. 
-     * @throws IOException
-     */
-    private boolean processCommand(DatanodeCommand cmd) throws IOException {
-      if (cmd == null)
-        return true;
-      final BlockCommand bcmd = 
-        cmd instanceof BlockCommand? (BlockCommand)cmd: null;
-
-      switch(cmd.getAction()) {
-      case DatanodeProtocol.DNA_TRANSFER:
-        // Send a copy of a block to another datanode
-        dn.transferBlocks(bcmd.getBlockPoolId(), bcmd.getBlocks(), bcmd.getTargets());
-        dn.metrics.incrBlocksReplicated(bcmd.getBlocks().length);
-        break;
-      case DatanodeProtocol.DNA_INVALIDATE:
-        //
-        // Some local block(s) are obsolete and can be 
-        // safely garbage-collected.
-        //
-        Block toDelete[] = bcmd.getBlocks();
-        try {
-          if (dn.blockScanner != null) {
-            dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
-          }
-          // using global fsdataset
-          dn.data.invalidate(bcmd.getBlockPoolId(), toDelete);
-        } catch(IOException e) {
-          dn.checkDiskError();
-          throw e;
-        }
-        dn.metrics.incrBlocksRemoved(toDelete.length);
-        break;
-      case DatanodeProtocol.DNA_SHUTDOWN:
-        // shut down the data node
-        shouldServiceRun = false;
-        return false;
-      case DatanodeProtocol.DNA_REGISTER:
-        // namenode requested a registration - at start or if NN lost contact
-        LOG.info("DatanodeCommand action: DNA_REGISTER");
-        if (shouldRun()) {
-          // re-retrieve namespace info to make sure that, if the NN
-          // was restarted, we still match its version (HDFS-2120)
-          retrieveNamespaceInfo();
-          // and re-register
-          register();
-        }
-        break;
-      case DatanodeProtocol.DNA_FINALIZE:
-        dn.storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
-            .getBlockPoolId());
-        break;
-      case UpgradeCommand.UC_ACTION_START_UPGRADE:
-        // start distributed upgrade here
-        processDistributedUpgradeCommand((UpgradeCommand)cmd);
-        break;
-      case DatanodeProtocol.DNA_RECOVERBLOCK:
-        dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
-        break;
-      case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
-        LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
-        if (dn.isBlockTokenEnabled) {
-          dn.blockPoolTokenSecretManager.setKeys(getBlockPoolId(), 
-              ((KeyUpdateCommand) cmd).getExportedKeys());
-        }
-        break;
-      case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
-        LOG.info("DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE");
-        long bandwidth =
-                   ((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
-        if (bandwidth > 0) {
-          DataXceiverServer dxcs =
-                       (DataXceiverServer) dn.dataXceiverServer.getRunnable();
-          dxcs.balanceThrottler.setBandwidth(bandwidth);
-        }
-        break;
-      default:
-        LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
-      }
-      return true;
-    }
-    
-    private void processDistributedUpgradeCommand(UpgradeCommand comm)
-    throws IOException {
-      UpgradeManagerDatanode upgradeManager = getUpgradeManager();
-      upgradeManager.processUpgradeCommand(comm);
-    }
-
-    synchronized UpgradeManagerDatanode getUpgradeManager() {
-      if(upgradeManager == null)
-        upgradeManager = 
-          new UpgradeManagerDatanode(dn, getBlockPoolId());
-      
-      return upgradeManager;
-    }
-    
-    /**
-     * Start distributed upgrade if it should be initiated by the data-node.
-     */
-    private void startDistributedUpgradeIfNeeded() throws IOException {
-      UpgradeManagerDatanode um = getUpgradeManager();
-      
-      if(!um.getUpgradeState())
-        return;
-      um.setUpgradeState(false, um.getUpgradeVersion());
-      um.startUpgrade();
-      return;
-    }
-
+  boolean areHeartbeatsDisabledForTests() {
+    return this.heartbeatsDisabledForTests;
   }
 
   /**
@@ -1430,7 +719,7 @@ public class DataNode extends Configured
    * sets the storage ID based on this registration.
    * Also updates the block pool's state in the secret manager.
    */
-  private synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
+  synchronized void bpRegistrationSucceeded(DatanodeRegistration bpRegistration,
       String blockPoolId)
       throws IOException {
     hostName = bpRegistration.getHost();
@@ -1487,7 +776,7 @@ public class DataNode extends Configured
   /**
    * Remove the given block pool from the block scanner, dataset, and storage.
    */
-  private void shutdownBlockPool(BPOfferService bpos) {
+  void shutdownBlockPool(BPOfferService bpos) {
     blockPoolManager.remove(bpos);
 
     String bpId = bpos.getBlockPoolId();
@@ -1962,7 +1251,7 @@ public class DataNode extends Configured
     }
   }
 
-  private void transferBlocks(String poolId, Block blocks[],
+  void transferBlocks(String poolId, Block blocks[],
       DatanodeInfo xferTargets[][]) {
     for (int i = 0; i < blocks.length; i++) {
       try {
@@ -2254,9 +1543,7 @@ public class DataNode extends Configured
       System.exit(-1);
     }
     Collection<URI> dataDirs = getStorageDirs(conf);
-    dnThreadName = "DataNode: [" +
-                    StringUtils.uriToString(dataDirs.toArray(new URI[0])) + "]";
-   UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation.setConfiguration(conf);
     SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,
         DFS_DATANODE_USER_NAME_KEY);
     return makeInstance(dataDirs, conf, resources);
@@ -2788,6 +2075,14 @@ public class DataNode extends Configured
     }
   }
 
+  /**
+   * Finalize a pending upgrade in response to DNA_FINALIZE.
+   * @param blockPoolId the block pool to finalize
+   */
+  void finalizeUpgradeForPool(String blockPoolId) throws IOException {
+    storage.finalizeUpgrade(blockPoolId);
+  }
+
   // Determine a Datanode's streaming address
   public static InetSocketAddress getStreamingAddr(Configuration conf) {
     return NetUtils.createSocketAddr(

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/common/TestDistributedUpgrade.java Mon Nov 21 19:27:12 2011
@@ -21,8 +21,6 @@ import static org.apache.hadoop.hdfs.pro
 
 import java.io.IOException;
 
-import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -37,9 +35,12 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 
+import org.junit.Test;
+import static org.junit.Assert.*;
+
 /**
  */
-public class TestDistributedUpgrade extends TestCase {
+public class TestDistributedUpgrade {
   private static final Log LOG = LogFactory.getLog(TestDistributedUpgrade.class);
   private Configuration conf;
   private int testCounter = 0;
@@ -93,6 +94,7 @@ public class TestDistributedUpgrade exte
  
   /**
    */
+  @Test(timeout=120000)
   public void testDistributedUpgrade() throws Exception {
     int numDirs = 1;
     TestDFSUpgradeFromImage testImg = new TestDFSUpgradeFromImage();

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeExit.java Mon Nov 21 19:27:12 2011
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMultipleRegistrations.java Mon Nov 21 19:27:12 2011
@@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.VolumeInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java Mon Nov 21 19:27:12 2011
@@ -29,7 +29,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -49,7 +48,7 @@ public class TestDatanodeRegister { 
     DataNode mockDN = mock(DataNode.class);
     Mockito.doReturn(true).when(mockDN).shouldRun();
     
-    BPOfferService bpos = new DataNode.BPOfferService(INVALID_ADDR, mockDN);
+    BPOfferService bpos = new BPOfferService(INVALID_ADDR, mockDN);
 
     NamespaceInfo fakeNSInfo = mock(NamespaceInfo.class);
     when(fakeNSInfo.getBuildVersion()).thenReturn("NSBuildVersion");

Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java?rev=1204660&r1=1204659&r2=1204660&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRefreshNamenodes.java Mon Nov 21 19:27:12 2011
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.DataNode.BPOfferService;
 import org.junit.Test;
 
 /**