You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/12/15 19:48:04 UTC

hadoop git commit: HDFS-9371. Code cleanup for DatanodeManager. Contributed by Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0c3a53e5a -> 860269233


HDFS-9371. Code cleanup for DatanodeManager. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86026923
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86026923
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86026923

Branch: refs/heads/trunk
Commit: 8602692338d6f493647205e0241e4116211fab75
Parents: 0c3a53e
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Dec 15 10:47:53 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Dec 15 10:47:53 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../server/blockmanagement/BlockManager.java    |  20 +-
 .../blockmanagement/DatanodeDescriptor.java     |   8 +-
 .../server/blockmanagement/DatanodeManager.java | 413 +++++++++----------
 .../blockmanagement/HeartbeatManager.java       |   2 +
 .../hdfs/server/namenode/FSNamesystem.java      |  24 +-
 6 files changed, 230 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c2f6863..ae0fdc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -899,6 +899,8 @@ Release 2.9.0 - UNRELEASED
     HDFS-9281. Change TestDeleteBlockPool to not explicitly use File to check
     block pool existence. (lei)
 
+    HDFS-9371. Code cleanup for DatanodeManager. (jing9)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ae1238b..9296726 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -132,6 +132,9 @@ public class BlockManager implements BlockStatsMXBean {
   private final HeartbeatManager heartbeatManager;
   private final BlockTokenSecretManager blockTokenSecretManager;
 
+  // Block pool ID used by this namenode
+  private String blockPoolId;
+
   private final PendingDataNodeMessages pendingDNMessages =
     new PendingDataNodeMessages();
 
@@ -462,11 +465,16 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   public void setBlockPoolId(String blockPoolId) {
+    this.blockPoolId = blockPoolId;
     if (isBlockTokenEnabled()) {
       blockTokenSecretManager.setBlockPoolId(blockPoolId);
     }
   }
 
+  public String getBlockPoolId() {
+    return blockPoolId;
+  }
+
   public BlockStoragePolicySuite getStoragePolicySuite() {
     return storagePolicySuite;
   }
@@ -1229,18 +1237,6 @@ public class BlockManager implements BlockStatsMXBean {
   }
 
   /**
-   * Remove all block invalidation tasks under this datanode UUID;
-   * used when a datanode registers with a new UUID and the old one
-   * is wiped.
-   */
-  void removeFromInvalidates(final DatanodeInfo datanode) {
-    if (!isPopulatingReplQueues()) {
-      return;
-    }
-    invalidateBlocks.remove(datanode);
-  }
-
-  /**
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index e5563eb..6709390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -290,11 +290,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.isAlive = isAlive;
   }
 
-  public boolean needKeyUpdate() {
+  public synchronized boolean needKeyUpdate() {
     return needKeyUpdate;
   }
 
-  public void setNeedKeyUpdate(boolean needKeyUpdate) {
+  public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) {
     this.needKeyUpdate = needKeyUpdate;
   }
 
@@ -868,14 +868,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /**
    * @return balancer bandwidth in bytes per second for this datanode
    */
-  public long getBalancerBandwidth() {
+  public synchronized long getBalancerBandwidth() {
     return this.bandwidth;
   }
 
   /**
    * @param bandwidth balancer bandwidth in bytes per second for this datanode
    */
-  public void setBalancerBandwidth(long bandwidth) {
+  public synchronized void setBalancerBandwidth(long bandwidth) {
     this.bandwidth = bandwidth;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index f758454..d535397 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY;
 import static org.apache.hadoop.util.Time.monotonicNow;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -162,7 +163,7 @@ public class DatanodeManager {
    * during rolling upgrades.
    * Software version -> Number of datanodes with this version
    */
-  private HashMap<String, Integer> datanodesSoftwareVersions =
+  private final HashMap<String, Integer> datanodesSoftwareVersions =
     new HashMap<>(4, 0.75f);
   
   /**
@@ -352,15 +353,9 @@ public class DatanodeManager {
   }
 
   private boolean isInactive(DatanodeInfo datanode) {
-    if (datanode.isDecommissioned()) {
-      return true;
-    }
+    return datanode.isDecommissioned() ||
+        (avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
 
-    if (avoidStaleDataNodesForRead) {
-      return datanode.isStale(staleInterval);
-    }
-      
-    return false;
   }
   
   /** Sort the located blocks by the distance to the target host. */
@@ -479,8 +474,9 @@ public class DatanodeManager {
     if (datanodeUuid == null) {
       return null;
     }
-
-    return datanodeMap.get(datanodeUuid);
+    synchronized (this) {
+      return datanodeMap.get(datanodeUuid);
+    }
   }
 
   /**
@@ -490,8 +486,8 @@ public class DatanodeManager {
    * @return DatanodeDescriptor or null if the node is not found.
    * @throws UnregisteredNodeException
    */
-  public DatanodeDescriptor getDatanode(DatanodeID nodeID
-      ) throws UnregisteredNodeException {
+  public DatanodeDescriptor getDatanode(DatanodeID nodeID)
+      throws UnregisteredNodeException {
     final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
     if (node == null) 
       return null;
@@ -535,13 +531,13 @@ public class DatanodeManager {
 
   /** Prints information about all datanodes. */
   void datanodeDump(final PrintWriter out) {
-    synchronized (datanodeMap) {
-      Map<String,DatanodeDescriptor> sortedDatanodeMap =
-          new TreeMap<>(datanodeMap);
-      out.println("Metasave: Number of datanodes: " + datanodeMap.size());
-      for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
-        out.println(node.dumpDatanode());
-      }
+    final Map<String,DatanodeDescriptor> sortedDatanodeMap;
+    synchronized (this) {
+      sortedDatanodeMap = new TreeMap<>(datanodeMap);
+    }
+    out.println("Metasave: Number of datanodes: " + sortedDatanodeMap.size());
+    for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
+      out.println(node.dumpDatanode());
     }
   }
 
@@ -567,8 +563,8 @@ public class DatanodeManager {
    * Remove a datanode
    * @throws UnregisteredNodeException 
    */
-  public void removeDatanode(final DatanodeID node
-      ) throws UnregisteredNodeException {
+  public void removeDatanode(final DatanodeID node)
+      throws UnregisteredNodeException {
     namesystem.writeLock();
     try {
       final DatanodeDescriptor descriptor = getDatanode(node);
@@ -585,19 +581,17 @@ public class DatanodeManager {
 
   /** Remove a dead datanode. */
   void removeDeadDatanode(final DatanodeID nodeID) {
-      synchronized(datanodeMap) {
-        DatanodeDescriptor d;
-        try {
-          d = getDatanode(nodeID);
-        } catch(IOException e) {
-          d = null;
-        }
-        if (d != null && isDatanodeDead(d)) {
-          NameNode.stateChangeLog.info(
-              "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
-          removeDatanode(d);
-        }
-      }
+    DatanodeDescriptor d;
+    try {
+      d = getDatanode(nodeID);
+    } catch(IOException e) {
+      d = null;
+    }
+    if (d != null && isDatanodeDead(d)) {
+      NameNode.stateChangeLog.info(
+          "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
+      removeDatanode(d);
+    }
   }
 
   /** Is the datanode dead? */
@@ -611,14 +605,13 @@ public class DatanodeManager {
     // To keep host2DatanodeMap consistent with datanodeMap,
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
-    synchronized(datanodeMap) {
+    synchronized(this) {
       host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
     }
 
     networktopology.add(node); // may throw InvalidTopologyException
     host2DatanodeMap.add(node);
     checkIfClusterIsNowMultiRack(node);
-    blockManager.getBlockReportLeaseManager().register(node);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@@ -629,11 +622,9 @@ public class DatanodeManager {
   /** Physically remove node from datanodeMap. */
   private void wipeDatanode(final DatanodeID node) {
     final String key = node.getDatanodeUuid();
-    synchronized (datanodeMap) {
+    synchronized (this) {
       host2DatanodeMap.remove(datanodeMap.remove(key));
     }
-    // Also remove all block invalidation tasks under this node
-    blockManager.removeFromInvalidates(new DatanodeInfo(node));
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
           + node + "): storage " + key 
@@ -645,7 +636,7 @@ public class DatanodeManager {
     if (version == null) {
       return;
     }
-    synchronized(datanodeMap) {
+    synchronized(this) {
       Integer count = this.datanodesSoftwareVersions.get(version);
       count = count == null ? 1 : count + 1;
       this.datanodesSoftwareVersions.put(version, count);
@@ -656,7 +647,7 @@ public class DatanodeManager {
     if (version == null) {
       return;
     }
-    synchronized(datanodeMap) {
+    synchronized(this) {
       Integer count = this.datanodesSoftwareVersions.get(version);
       if(count != null) {
         if(count > 1) {
@@ -674,24 +665,22 @@ public class DatanodeManager {
   }
 
   private void countSoftwareVersions() {
-    synchronized(datanodeMap) {
-      HashMap<String, Integer> versionCount = new HashMap<>();
+    synchronized(this) {
+      datanodesSoftwareVersions.clear();
       for(DatanodeDescriptor dn: datanodeMap.values()) {
         // Check isAlive too because right after removeDatanode(),
         // isDatanodeDead() is still true 
-        if(shouldCountVersion(dn))
-        {
-          Integer num = versionCount.get(dn.getSoftwareVersion());
+        if (shouldCountVersion(dn)) {
+          Integer num = datanodesSoftwareVersions.get(dn.getSoftwareVersion());
           num = num == null ? 1 : num+1;
-          versionCount.put(dn.getSoftwareVersion(), num);
+          datanodesSoftwareVersions.put(dn.getSoftwareVersion(), num);
         }
       }
-      this.datanodesSoftwareVersions = versionCount;
     }
   }
 
   public HashMap<String, Integer> getDatanodesSoftwareVersions() {
-    synchronized(datanodeMap) {
+    synchronized(this) {
       return new HashMap<> (this.datanodesSoftwareVersions);
     }
   }
@@ -747,13 +736,11 @@ public class DatanodeManager {
   /**
    * Resolve network locations for specified hosts
    *
-   * @param names
    * @return Network locations if available, Else returns null
    */
   public List<String> resolveNetworkLocation(List<String> names) {
     // resolve its network location
-    List<String> rName = dnsToSwitchMapping.resolve(names);
-    return rName;
+    return dnsToSwitchMapping.resolve(names);
   }
 
   /**
@@ -807,10 +794,9 @@ public class DatanodeManager {
    * This is used to not to display a decommissioned datanode to the operators.
    * @param nodeList , array list of live or dead nodes.
    */
-  private void removeDecomNodeFromList(
+  private static void removeDecomNodeFromList(
       final List<DatanodeDescriptor> nodeList) {
-    Iterator<DatanodeDescriptor> it=null;
-    for (it = nodeList.iterator(); it.hasNext();) {
+    for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
       DatanodeDescriptor node = it.next();
       if (node.isDecommissioned()) {
         it.remove();
@@ -968,6 +954,7 @@ public class DatanodeManager {
   
         // register new datanode
         addDatanode(nodeDescr);
+        blockManager.getBlockReportLeaseManager().register(nodeDescr);
         // also treat the registration message as a heartbeat
         // no need to update its timestamp
         // because its is done when the descriptor is created
@@ -1030,7 +1017,11 @@ public class DatanodeManager {
    * 4. Removed from exclude --> stop decommission.
    */
   private void refreshDatanodes() {
-    for(DatanodeDescriptor node : datanodeMap.values()) {
+    final Map<String, DatanodeDescriptor> copy;
+    synchronized (this) {
+      copy = new HashMap<>(datanodeMap);
+    }
+    for (DatanodeDescriptor node : copy.values()) {
       // Check if not include.
       if (!hostFileManager.isIncluded(node)) {
         node.setDisallowed(true); // case 2.
@@ -1047,7 +1038,7 @@ public class DatanodeManager {
   /** @return the number of live datanodes. */
   public int getNumLiveDataNodes() {
     int numLive = 0;
-    synchronized (datanodeMap) {
+    synchronized (this) {
       for(DatanodeDescriptor dn : datanodeMap.values()) {
         if (!isDatanodeDead(dn) ) {
           numLive++;
@@ -1252,7 +1243,7 @@ public class DatanodeManager {
     final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
     final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
 
-    synchronized(datanodeMap) {
+    synchronized(this) {
       nodes = new ArrayList<>(datanodeMap.size());
       for (DatanodeDescriptor dn : datanodeMap.values()) {
         final boolean isDead = isDatanodeDead(dn);
@@ -1327,155 +1318,160 @@ public class DatanodeManager {
     node.setLastUpdateMonotonic(0);
   }
 
+  private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
+      DatanodeDescriptor nodeinfo) {
+    BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+    if (blocks == null) {
+      return null;
+    }
+    BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length);
+    for (BlockInfo b : blocks) {
+      BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
+      assert uc != null;
+      final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+      // Skip stale nodes during recovery
+      final List<DatanodeStorageInfo> recoveryLocations =
+          new ArrayList<>(storages.length);
+      for (DatanodeStorageInfo storage : storages) {
+        if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
+          recoveryLocations.add(storage);
+        }
+      }
+      // If we are performing a truncate recovery than set recovery fields
+      // to old block.
+      boolean truncateRecovery = uc.getTruncateBlock() != null;
+      boolean copyOnTruncateRecovery = truncateRecovery &&
+          uc.getTruncateBlock().getBlockId() != b.getBlockId();
+      ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
+          new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
+          new ExtendedBlock(blockPoolId, b);
+      // If we only get 1 replica after eliminating stale nodes, choose all
+      // replicas for recovery and let the primary data node handle failures.
+      DatanodeInfo[] recoveryInfos;
+      if (recoveryLocations.size() > 1) {
+        if (recoveryLocations.size() != storages.length) {
+          LOG.info("Skipped stale nodes for recovery : "
+              + (storages.length - recoveryLocations.size()));
+        }
+        recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
+      } else {
+        // If too many replicas are stale, then choose all replicas to
+        // participate in block recovery.
+        recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
+      }
+      RecoveringBlock rBlock;
+      if (truncateRecovery) {
+        Block recoveryBlock = (copyOnTruncateRecovery) ? b : uc.getTruncateBlock();
+        rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock);
+      } else {
+        rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
+            uc.getBlockRecoveryId());
+      }
+      brCommand.add(rBlock);
+    }
+    return brCommand;
+  }
+
+  private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo,
+      List<DatanodeCommand> cmds) {
+    boolean sendingCachingCommands = false;
+    final long nowMs = monotonicNow();
+    if (shouldSendCachingCommands &&
+        ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
+            timeBetweenResendingCachingDirectivesMs)) {
+      DatanodeCommand pendingCacheCommand = getCacheCommand(
+          nodeinfo.getPendingCached(), DatanodeProtocol.DNA_CACHE,
+          blockPoolId);
+      if (pendingCacheCommand != null) {
+        cmds.add(pendingCacheCommand);
+        sendingCachingCommands = true;
+      }
+      DatanodeCommand pendingUncacheCommand = getCacheCommand(
+          nodeinfo.getPendingUncached(), DatanodeProtocol.DNA_UNCACHE,
+          blockPoolId);
+      if (pendingUncacheCommand != null) {
+        cmds.add(pendingUncacheCommand);
+        sendingCachingCommands = true;
+      }
+      if (sendingCachingCommands) {
+        nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
+      }
+    }
+  }
+
   /** Handle heartbeat from datanodes. */
   public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       StorageReport[] reports, final String blockPoolId,
       long cacheCapacity, long cacheUsed, int xceiverCount, 
       int maxTransfers, int failedVolumes,
       VolumeFailureSummary volumeFailureSummary) throws IOException {
-    synchronized (heartbeatManager) {
-      synchronized (datanodeMap) {
-        DatanodeDescriptor nodeinfo;
-        try {
-          nodeinfo = getDatanode(nodeReg);
-        } catch(UnregisteredNodeException e) {
-          return new DatanodeCommand[]{RegisterCommand.REGISTER};
-        }
-        
-        // Check if this datanode should actually be shutdown instead. 
-        if (nodeinfo != null && nodeinfo.isDisallowed()) {
-          setDatanodeDead(nodeinfo);
-          throw new DisallowedDatanodeException(nodeinfo);
-        }
-
-        if (nodeinfo == null || !nodeinfo.isAlive()) {
-          return new DatanodeCommand[]{RegisterCommand.REGISTER};
-        }
-
-        heartbeatManager.updateHeartbeat(nodeinfo, reports,
-                                         cacheCapacity, cacheUsed,
-                                         xceiverCount, failedVolumes,
-                                         volumeFailureSummary);
+    final DatanodeDescriptor nodeinfo;
+    try {
+      nodeinfo = getDatanode(nodeReg);
+    } catch (UnregisteredNodeException e) {
+      return new DatanodeCommand[]{RegisterCommand.REGISTER};
+    }
 
-        // If we are in safemode, do not send back any recovery / replication
-        // requests. Don't even drain the existing queue of work.
-        if(namesystem.isInSafeMode()) {
-          return new DatanodeCommand[0];
-        }
+    // Check if this datanode should actually be shutdown instead.
+    if (nodeinfo != null && nodeinfo.isDisallowed()) {
+      setDatanodeDead(nodeinfo);
+      throw new DisallowedDatanodeException(nodeinfo);
+    }
 
-        //check lease recovery
-        BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
-        if (blocks != null) {
-          BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
-              blocks.length);
-          for (BlockInfo b : blocks) {
-            BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
-            assert uc != null;
-            final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
-            // Skip stale nodes during recovery - not heart beated for some time (30s by default).
-            final List<DatanodeStorageInfo> recoveryLocations =
-                new ArrayList<>(storages.length);
-            for (DatanodeStorageInfo storage : storages) {
-              if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
-                recoveryLocations.add(storage);
-              }
-            }
-            // If we are performing a truncate recovery than set recovery fields
-            // to old block.
-            boolean truncateRecovery = uc.getTruncateBlock() != null;
-            boolean copyOnTruncateRecovery = truncateRecovery &&
-                uc.getTruncateBlock().getBlockId() != b.getBlockId();
-            ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
-                new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
-                new ExtendedBlock(blockPoolId, b);
-            // If we only get 1 replica after eliminating stale nodes, then choose all
-            // replicas for recovery and let the primary data node handle failures.
-            DatanodeInfo[] recoveryInfos;
-            if (recoveryLocations.size() > 1) {
-              if (recoveryLocations.size() != storages.length) {
-                LOG.info("Skipped stale nodes for recovery : " +
-                    (storages.length - recoveryLocations.size()));
-              }
-              recoveryInfos =
-                  DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
-            } else {
-              // If too many replicas are stale, then choose all replicas to participate
-              // in block recovery.
-              recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
-            }
-            RecoveringBlock rBlock;
-            if(truncateRecovery) {
-              Block recoveryBlock = (copyOnTruncateRecovery) ? b :
-                  uc.getTruncateBlock();
-              rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
-                  recoveryBlock);
-            } else {
-              rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
-                  uc.getBlockRecoveryId());
-            }
-            brCommand.add(rBlock);
-          }
-          return new DatanodeCommand[] { brCommand };
-        }
+    if (nodeinfo == null || !nodeinfo.isAlive()) {
+      return new DatanodeCommand[]{RegisterCommand.REGISTER};
+    }
+    heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
+        cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
 
-        final List<DatanodeCommand> cmds = new ArrayList<>();
-        //check pending replication
-        List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
-              maxTransfers);
-        if (pendingList != null) {
-          cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
-              pendingList));
-        }
-        // checking pending erasure coding tasks
-        List<BlockECRecoveryInfo> pendingECList =
-            nodeinfo.getErasureCodeCommand(maxTransfers);
-        if (pendingECList != null) {
-          cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
-              pendingECList));
-        }
-        //check block invalidation
-        Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
-        if (blks != null) {
-          cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
-              blockPoolId, blks));
-        }
-        boolean sendingCachingCommands = false;
-        long nowMs = monotonicNow();
-        if (shouldSendCachingCommands && 
-            ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
-                timeBetweenResendingCachingDirectivesMs)) {
-          DatanodeCommand pendingCacheCommand =
-              getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
-                DatanodeProtocol.DNA_CACHE, blockPoolId);
-          if (pendingCacheCommand != null) {
-            cmds.add(pendingCacheCommand);
-            sendingCachingCommands = true;
-          }
-          DatanodeCommand pendingUncacheCommand =
-              getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
-                DatanodeProtocol.DNA_UNCACHE, blockPoolId);
-          if (pendingUncacheCommand != null) {
-            cmds.add(pendingUncacheCommand);
-            sendingCachingCommands = true;
-          }
-          if (sendingCachingCommands) {
-            nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
-          }
-        }
+    // If we are in safemode, do not send back any recovery / replication
+    // requests. Don't even drain the existing queue of work.
+    if (namesystem.isInSafeMode()) {
+      return new DatanodeCommand[0];
+    }
 
-        blockManager.addKeyUpdateCommand(cmds, nodeinfo);
+    // block recovery command
+    final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
+        nodeinfo);
+    if (brCommand != null) {
+      return new DatanodeCommand[]{brCommand};
+    }
 
-        // check for balancer bandwidth update
-        if (nodeinfo.getBalancerBandwidth() > 0) {
-          cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
-          // set back to 0 to indicate that datanode has been sent the new value
-          nodeinfo.setBalancerBandwidth(0);
-        }
+    final List<DatanodeCommand> cmds = new ArrayList<>();
+    // check pending replication
+    List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+        maxTransfers);
+    if (pendingList != null) {
+      cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+          pendingList));
+    }
+    // check pending erasure coding tasks
+    List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand(
+        maxTransfers);
+    if (pendingECList != null) {
+      cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY,
+          pendingECList));
+    }
+    // check block invalidation
+    Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+    if (blks != null) {
+      cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
+          blks));
+    }
+    // cache commands
+    addCacheCommands(blockPoolId, nodeinfo, cmds);
+    // key update command
+    blockManager.addKeyUpdateCommand(cmds, nodeinfo);
+
+    // check for balancer bandwidth update
+    if (nodeinfo.getBalancerBandwidth() > 0) {
+      cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
+      // set back to 0 to indicate that datanode has been sent the new value
+      nodeinfo.setBalancerBandwidth(0);
+    }
 
-        if (!cmds.isEmpty()) {
-          return cmds.toArray(new DatanodeCommand[cmds.size()]);
-        }
-      }
+    if (!cmds.isEmpty()) {
+      return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }
 
     return new DatanodeCommand[0];
@@ -1486,14 +1482,13 @@ public class DatanodeManager {
    *
    * @param list       The {@link CachedBlocksList}.  This function 
    *                   clears the list.
-   * @param datanode   The datanode.
    * @param action     The action to perform in the command.
    * @param poolId     The block pool id.
    * @return           A DatanodeCommand to be sent back to the DN, or null if
    *                   there is nothing to be done.
    */
-  private DatanodeCommand getCacheCommand(CachedBlocksList list,
-      DatanodeDescriptor datanode, int action, String poolId) {
+  private DatanodeCommand getCacheCommand(CachedBlocksList list, int action,
+      String poolId) {
     int length = list.size();
     if (length == 0) {
       return null;
@@ -1501,9 +1496,7 @@ public class DatanodeManager {
     // Read the existing cache commands.
     long[] blockIds = new long[length];
     int i = 0;
-    for (Iterator<CachedBlock> iter = list.iterator();
-            iter.hasNext(); ) {
-      CachedBlock cachedBlock = iter.next();
+    for (CachedBlock cachedBlock : list) {
       blockIds[i++] = cachedBlock.getBlockId();
     }
     return new BlockIdCommand(action, poolId, blockIds);
@@ -1524,7 +1517,7 @@ public class DatanodeManager {
    * @throws IOException
    */
   public void setBalancerBandwidth(long bandwidth) throws IOException {
-    synchronized(datanodeMap) {
+    synchronized(this) {
       for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
         nodeInfo.setBalancerBandwidth(bandwidth);
       }
@@ -1533,7 +1526,7 @@ public class DatanodeManager {
   
   public void markAllDatanodesStale() {
     LOG.info("Marking all datandoes as stale");
-    synchronized (datanodeMap) {
+    synchronized (this) {
       for (DatanodeDescriptor dn : datanodeMap.values()) {
         for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
           storage.markStaleAfterFailover();
@@ -1548,7 +1541,7 @@ public class DatanodeManager {
    * recoveries, and replication requests.
    */
   public void clearPendingQueues() {
-    synchronized (datanodeMap) {
+    synchronized (this) {
       for (DatanodeDescriptor dn : datanodeMap.values()) {
         dn.clearBlockQueues();
       }
@@ -1560,7 +1553,7 @@ public class DatanodeManager {
    * know about.
    */
   public void resetLastCachingDirectiveSentTime() {
-    synchronized (datanodeMap) {
+    synchronized (this) {
       for (DatanodeDescriptor dn : datanodeMap.values()) {
         dn.setLastCachingDirectiveSentTimeMs(0L);
       }
@@ -1573,9 +1566,11 @@ public class DatanodeManager {
   }
 
   public void clearPendingCachingCommands() {
-    for (DatanodeDescriptor dn : datanodeMap.values()) {
-      dn.getPendingCached().clear();
-      dn.getPendingUncached().clear();
+    synchronized (this) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.getPendingCached().clear();
+        dn.getPendingUncached().clear();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d0369aa..9f23b32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.util.Daemon;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ba6f0e1..b25c5f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -418,9 +418,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    */
   private volatile boolean needRollbackFsImage;
 
-  // Block pool ID used by this namenode
-  private String blockPoolId;
-
   final LeaseManager leaseManager = new LeaseManager(this); 
 
   Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@@ -2348,12 +2345,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   ExtendedBlock getExtendedBlock(Block blk) {
-    return new ExtendedBlock(blockPoolId, blk);
+    return new ExtendedBlock(getBlockPoolId(), blk);
   }
   
   void setBlockPoolId(String bpid) {
-    blockPoolId = bpid;
-    blockManager.setBlockPoolId(blockPoolId);
+    blockManager.setBlockPoolId(bpid);
   }
 
   /**
@@ -3489,11 +3485,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    * The given node has reported in.  This method should:
    * 1) Record the heartbeat, so the datanode isn't timed out
    * 2) Adjust usage stats for future block allocation
-   * 
-   * If a substantial amount of time passed since the last datanode 
-   * heartbeat then request an immediate block report.  
-   * 
-   * @return an array of datanode commands 
+   *
+   * If a substantial amount of time passed since the last datanode
+   * heartbeat then request an immediate block report.
+   *
+   * @return an array of datanode commands
    * @throws IOException
    */
   HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
@@ -3507,7 +3503,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       final int maxTransfer = blockManager.getMaxReplicationStreams()
           - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
-          nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
+          nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
           xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
       long blockReportLeaseId = 0;
       if (requestFullBlockReportLease) {
@@ -5371,7 +5367,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   
   @Override  // NameNodeMXBean
   public String getBlockPoolId() {
-    return blockPoolId;
+    return getBlockManager().getBlockPoolId();
   }
   
   @Override  // NameNodeMXBean
@@ -5960,7 +5956,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   }
 
   void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) {
-    rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId,
+    rollingUpgradeInfo = new RollingUpgradeInfo(getBlockPoolId(),
         createdRollbackImages, startTime, 0L);
   }