You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/12/15 19:48:04 UTC
hadoop git commit: HDFS-9371. Code cleanup for DatanodeManager.
Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/trunk 0c3a53e5a -> 860269233
HDFS-9371. Code cleanup for DatanodeManager. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/86026923
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/86026923
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/86026923
Branch: refs/heads/trunk
Commit: 8602692338d6f493647205e0241e4116211fab75
Parents: 0c3a53e
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Dec 15 10:47:53 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Dec 15 10:47:53 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../server/blockmanagement/BlockManager.java | 20 +-
.../blockmanagement/DatanodeDescriptor.java | 8 +-
.../server/blockmanagement/DatanodeManager.java | 413 +++++++++----------
.../blockmanagement/HeartbeatManager.java | 2 +
.../hdfs/server/namenode/FSNamesystem.java | 24 +-
6 files changed, 230 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c2f6863..ae0fdc4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -899,6 +899,8 @@ Release 2.9.0 - UNRELEASED
HDFS-9281. Change TestDeleteBlockPool to not explicitly use File to check
block pool existence. (lei)
+ HDFS-9371. Code cleanup for DatanodeManager. (jing9)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index ae1238b..9296726 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -132,6 +132,9 @@ public class BlockManager implements BlockStatsMXBean {
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
+ // Block pool ID used by this namenode
+ private String blockPoolId;
+
private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
@@ -462,11 +465,16 @@ public class BlockManager implements BlockStatsMXBean {
}
public void setBlockPoolId(String blockPoolId) {
+ this.blockPoolId = blockPoolId;
if (isBlockTokenEnabled()) {
blockTokenSecretManager.setBlockPoolId(blockPoolId);
}
}
+ public String getBlockPoolId() {
+ return blockPoolId;
+ }
+
public BlockStoragePolicySuite getStoragePolicySuite() {
return storagePolicySuite;
}
@@ -1229,18 +1237,6 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * Remove all block invalidation tasks under this datanode UUID;
- * used when a datanode registers with a new UUID and the old one
- * is wiped.
- */
- void removeFromInvalidates(final DatanodeInfo datanode) {
- if (!isPopulatingReplQueues()) {
- return;
- }
- invalidateBlocks.remove(datanode);
- }
-
- /**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index e5563eb..6709390 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -290,11 +290,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.isAlive = isAlive;
}
- public boolean needKeyUpdate() {
+ public synchronized boolean needKeyUpdate() {
return needKeyUpdate;
}
- public void setNeedKeyUpdate(boolean needKeyUpdate) {
+ public synchronized void setNeedKeyUpdate(boolean needKeyUpdate) {
this.needKeyUpdate = needKeyUpdate;
}
@@ -868,14 +868,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
/**
* @return balancer bandwidth in bytes per second for this datanode
*/
- public long getBalancerBandwidth() {
+ public synchronized long getBalancerBandwidth() {
return this.bandwidth;
}
/**
* @param bandwidth balancer bandwidth in bytes per second for this datanode
*/
- public void setBalancerBandwidth(long bandwidth) {
+ public synchronized void setBalancerBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index f758454..d535397 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import static org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.annotations.VisibleForTesting;
@@ -162,7 +163,7 @@ public class DatanodeManager {
* during rolling upgrades.
* Software version -> Number of datanodes with this version
*/
- private HashMap<String, Integer> datanodesSoftwareVersions =
+ private final HashMap<String, Integer> datanodesSoftwareVersions =
new HashMap<>(4, 0.75f);
/**
@@ -352,15 +353,9 @@ public class DatanodeManager {
}
private boolean isInactive(DatanodeInfo datanode) {
- if (datanode.isDecommissioned()) {
- return true;
- }
+ return datanode.isDecommissioned() ||
+ (avoidStaleDataNodesForRead && datanode.isStale(staleInterval));
- if (avoidStaleDataNodesForRead) {
- return datanode.isStale(staleInterval);
- }
-
- return false;
}
/** Sort the located blocks by the distance to the target host. */
@@ -479,8 +474,9 @@ public class DatanodeManager {
if (datanodeUuid == null) {
return null;
}
-
- return datanodeMap.get(datanodeUuid);
+ synchronized (this) {
+ return datanodeMap.get(datanodeUuid);
+ }
}
/**
@@ -490,8 +486,8 @@ public class DatanodeManager {
* @return DatanodeDescriptor or null if the node is not found.
* @throws UnregisteredNodeException
*/
- public DatanodeDescriptor getDatanode(DatanodeID nodeID
- ) throws UnregisteredNodeException {
+ public DatanodeDescriptor getDatanode(DatanodeID nodeID)
+ throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
if (node == null)
return null;
@@ -535,13 +531,13 @@ public class DatanodeManager {
/** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) {
- synchronized (datanodeMap) {
- Map<String,DatanodeDescriptor> sortedDatanodeMap =
- new TreeMap<>(datanodeMap);
- out.println("Metasave: Number of datanodes: " + datanodeMap.size());
- for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
- out.println(node.dumpDatanode());
- }
+ final Map<String,DatanodeDescriptor> sortedDatanodeMap;
+ synchronized (this) {
+ sortedDatanodeMap = new TreeMap<>(datanodeMap);
+ }
+ out.println("Metasave: Number of datanodes: " + sortedDatanodeMap.size());
+ for (DatanodeDescriptor node : sortedDatanodeMap.values()) {
+ out.println(node.dumpDatanode());
}
}
@@ -567,8 +563,8 @@ public class DatanodeManager {
* Remove a datanode
* @throws UnregisteredNodeException
*/
- public void removeDatanode(final DatanodeID node
- ) throws UnregisteredNodeException {
+ public void removeDatanode(final DatanodeID node)
+ throws UnregisteredNodeException {
namesystem.writeLock();
try {
final DatanodeDescriptor descriptor = getDatanode(node);
@@ -585,19 +581,17 @@ public class DatanodeManager {
/** Remove a dead datanode. */
void removeDeadDatanode(final DatanodeID nodeID) {
- synchronized(datanodeMap) {
- DatanodeDescriptor d;
- try {
- d = getDatanode(nodeID);
- } catch(IOException e) {
- d = null;
- }
- if (d != null && isDatanodeDead(d)) {
- NameNode.stateChangeLog.info(
- "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
- removeDatanode(d);
- }
- }
+ DatanodeDescriptor d;
+ try {
+ d = getDatanode(nodeID);
+ } catch(IOException e) {
+ d = null;
+ }
+ if (d != null && isDatanodeDead(d)) {
+ NameNode.stateChangeLog.info(
+ "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
+ removeDatanode(d);
+ }
}
/** Is the datanode dead? */
@@ -611,14 +605,13 @@ public class DatanodeManager {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
- synchronized(datanodeMap) {
+ synchronized(this) {
host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
}
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
- blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "
@@ -629,11 +622,9 @@ public class DatanodeManager {
/** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) {
final String key = node.getDatanodeUuid();
- synchronized (datanodeMap) {
+ synchronized (this) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
- // Also remove all block invalidation tasks under this node
- blockManager.removeFromInvalidates(new DatanodeInfo(node));
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
+ node + "): storage " + key
@@ -645,7 +636,7 @@ public class DatanodeManager {
if (version == null) {
return;
}
- synchronized(datanodeMap) {
+ synchronized(this) {
Integer count = this.datanodesSoftwareVersions.get(version);
count = count == null ? 1 : count + 1;
this.datanodesSoftwareVersions.put(version, count);
@@ -656,7 +647,7 @@ public class DatanodeManager {
if (version == null) {
return;
}
- synchronized(datanodeMap) {
+ synchronized(this) {
Integer count = this.datanodesSoftwareVersions.get(version);
if(count != null) {
if(count > 1) {
@@ -674,24 +665,22 @@ public class DatanodeManager {
}
private void countSoftwareVersions() {
- synchronized(datanodeMap) {
- HashMap<String, Integer> versionCount = new HashMap<>();
+ synchronized(this) {
+ datanodesSoftwareVersions.clear();
for(DatanodeDescriptor dn: datanodeMap.values()) {
// Check isAlive too because right after removeDatanode(),
// isDatanodeDead() is still true
- if(shouldCountVersion(dn))
- {
- Integer num = versionCount.get(dn.getSoftwareVersion());
+ if (shouldCountVersion(dn)) {
+ Integer num = datanodesSoftwareVersions.get(dn.getSoftwareVersion());
num = num == null ? 1 : num+1;
- versionCount.put(dn.getSoftwareVersion(), num);
+ datanodesSoftwareVersions.put(dn.getSoftwareVersion(), num);
}
}
- this.datanodesSoftwareVersions = versionCount;
}
}
public HashMap<String, Integer> getDatanodesSoftwareVersions() {
- synchronized(datanodeMap) {
+ synchronized(this) {
return new HashMap<> (this.datanodesSoftwareVersions);
}
}
@@ -747,13 +736,11 @@ public class DatanodeManager {
/**
* Resolve network locations for specified hosts
*
- * @param names
* @return Network locations if available, Else returns null
*/
public List<String> resolveNetworkLocation(List<String> names) {
// resolve its network location
- List<String> rName = dnsToSwitchMapping.resolve(names);
- return rName;
+ return dnsToSwitchMapping.resolve(names);
}
/**
@@ -807,10 +794,9 @@ public class DatanodeManager {
* This is used to not to display a decommissioned datanode to the operators.
* @param nodeList , array list of live or dead nodes.
*/
- private void removeDecomNodeFromList(
+ private static void removeDecomNodeFromList(
final List<DatanodeDescriptor> nodeList) {
- Iterator<DatanodeDescriptor> it=null;
- for (it = nodeList.iterator(); it.hasNext();) {
+ for (Iterator<DatanodeDescriptor> it = nodeList.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (node.isDecommissioned()) {
it.remove();
@@ -968,6 +954,7 @@ public class DatanodeManager {
// register new datanode
addDatanode(nodeDescr);
+ blockManager.getBlockReportLeaseManager().register(nodeDescr);
// also treat the registration message as a heartbeat
// no need to update its timestamp
// because its is done when the descriptor is created
@@ -1030,7 +1017,11 @@ public class DatanodeManager {
* 4. Removed from exclude --> stop decommission.
*/
private void refreshDatanodes() {
- for(DatanodeDescriptor node : datanodeMap.values()) {
+ final Map<String, DatanodeDescriptor> copy;
+ synchronized (this) {
+ copy = new HashMap<>(datanodeMap);
+ }
+ for (DatanodeDescriptor node : copy.values()) {
// Check if not include.
if (!hostFileManager.isIncluded(node)) {
node.setDisallowed(true); // case 2.
@@ -1047,7 +1038,7 @@ public class DatanodeManager {
/** @return the number of live datanodes. */
public int getNumLiveDataNodes() {
int numLive = 0;
- synchronized (datanodeMap) {
+ synchronized (this) {
for(DatanodeDescriptor dn : datanodeMap.values()) {
if (!isDatanodeDead(dn) ) {
numLive++;
@@ -1252,7 +1243,7 @@ public class DatanodeManager {
final HostFileManager.HostSet includedNodes = hostFileManager.getIncludes();
final HostFileManager.HostSet excludedNodes = hostFileManager.getExcludes();
- synchronized(datanodeMap) {
+ synchronized(this) {
nodes = new ArrayList<>(datanodeMap.size());
for (DatanodeDescriptor dn : datanodeMap.values()) {
final boolean isDead = isDatanodeDead(dn);
@@ -1327,155 +1318,160 @@ public class DatanodeManager {
node.setLastUpdateMonotonic(0);
}
+ private BlockRecoveryCommand getBlockRecoveryCommand(String blockPoolId,
+ DatanodeDescriptor nodeinfo) {
+ BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+ if (blocks == null) {
+ return null;
+ }
+ BlockRecoveryCommand brCommand = new BlockRecoveryCommand(blocks.length);
+ for (BlockInfo b : blocks) {
+ BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
+ assert uc != null;
+ final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
+ // Skip stale nodes during recovery
+ final List<DatanodeStorageInfo> recoveryLocations =
+ new ArrayList<>(storages.length);
+ for (DatanodeStorageInfo storage : storages) {
+ if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
+ recoveryLocations.add(storage);
+ }
+ }
+ // If we are performing a truncate recovery than set recovery fields
+ // to old block.
+ boolean truncateRecovery = uc.getTruncateBlock() != null;
+ boolean copyOnTruncateRecovery = truncateRecovery &&
+ uc.getTruncateBlock().getBlockId() != b.getBlockId();
+ ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
+ new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
+ new ExtendedBlock(blockPoolId, b);
+ // If we only get 1 replica after eliminating stale nodes, choose all
+ // replicas for recovery and let the primary data node handle failures.
+ DatanodeInfo[] recoveryInfos;
+ if (recoveryLocations.size() > 1) {
+ if (recoveryLocations.size() != storages.length) {
+ LOG.info("Skipped stale nodes for recovery : "
+ + (storages.length - recoveryLocations.size()));
+ }
+ recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
+ } else {
+ // If too many replicas are stale, then choose all replicas to
+ // participate in block recovery.
+ recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
+ }
+ RecoveringBlock rBlock;
+ if (truncateRecovery) {
+ Block recoveryBlock = (copyOnTruncateRecovery) ? b : uc.getTruncateBlock();
+ rBlock = new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock);
+ } else {
+ rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
+ uc.getBlockRecoveryId());
+ }
+ brCommand.add(rBlock);
+ }
+ return brCommand;
+ }
+
+ private void addCacheCommands(String blockPoolId, DatanodeDescriptor nodeinfo,
+ List<DatanodeCommand> cmds) {
+ boolean sendingCachingCommands = false;
+ final long nowMs = monotonicNow();
+ if (shouldSendCachingCommands &&
+ ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
+ timeBetweenResendingCachingDirectivesMs)) {
+ DatanodeCommand pendingCacheCommand = getCacheCommand(
+ nodeinfo.getPendingCached(), DatanodeProtocol.DNA_CACHE,
+ blockPoolId);
+ if (pendingCacheCommand != null) {
+ cmds.add(pendingCacheCommand);
+ sendingCachingCommands = true;
+ }
+ DatanodeCommand pendingUncacheCommand = getCacheCommand(
+ nodeinfo.getPendingUncached(), DatanodeProtocol.DNA_UNCACHE,
+ blockPoolId);
+ if (pendingUncacheCommand != null) {
+ cmds.add(pendingUncacheCommand);
+ sendingCachingCommands = true;
+ }
+ if (sendingCachingCommands) {
+ nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
+ }
+ }
+ }
+
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
- synchronized (heartbeatManager) {
- synchronized (datanodeMap) {
- DatanodeDescriptor nodeinfo;
- try {
- nodeinfo = getDatanode(nodeReg);
- } catch(UnregisteredNodeException e) {
- return new DatanodeCommand[]{RegisterCommand.REGISTER};
- }
-
- // Check if this datanode should actually be shutdown instead.
- if (nodeinfo != null && nodeinfo.isDisallowed()) {
- setDatanodeDead(nodeinfo);
- throw new DisallowedDatanodeException(nodeinfo);
- }
-
- if (nodeinfo == null || !nodeinfo.isAlive()) {
- return new DatanodeCommand[]{RegisterCommand.REGISTER};
- }
-
- heartbeatManager.updateHeartbeat(nodeinfo, reports,
- cacheCapacity, cacheUsed,
- xceiverCount, failedVolumes,
- volumeFailureSummary);
+ final DatanodeDescriptor nodeinfo;
+ try {
+ nodeinfo = getDatanode(nodeReg);
+ } catch (UnregisteredNodeException e) {
+ return new DatanodeCommand[]{RegisterCommand.REGISTER};
+ }
- // If we are in safemode, do not send back any recovery / replication
- // requests. Don't even drain the existing queue of work.
- if(namesystem.isInSafeMode()) {
- return new DatanodeCommand[0];
- }
+ // Check if this datanode should actually be shutdown instead.
+ if (nodeinfo != null && nodeinfo.isDisallowed()) {
+ setDatanodeDead(nodeinfo);
+ throw new DisallowedDatanodeException(nodeinfo);
+ }
- //check lease recovery
- BlockInfo[] blocks = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
- if (blocks != null) {
- BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
- blocks.length);
- for (BlockInfo b : blocks) {
- BlockUnderConstructionFeature uc = b.getUnderConstructionFeature();
- assert uc != null;
- final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
- // Skip stale nodes during recovery - not heart beated for some time (30s by default).
- final List<DatanodeStorageInfo> recoveryLocations =
- new ArrayList<>(storages.length);
- for (DatanodeStorageInfo storage : storages) {
- if (!storage.getDatanodeDescriptor().isStale(staleInterval)) {
- recoveryLocations.add(storage);
- }
- }
- // If we are performing a truncate recovery than set recovery fields
- // to old block.
- boolean truncateRecovery = uc.getTruncateBlock() != null;
- boolean copyOnTruncateRecovery = truncateRecovery &&
- uc.getTruncateBlock().getBlockId() != b.getBlockId();
- ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ?
- new ExtendedBlock(blockPoolId, uc.getTruncateBlock()) :
- new ExtendedBlock(blockPoolId, b);
- // If we only get 1 replica after eliminating stale nodes, then choose all
- // replicas for recovery and let the primary data node handle failures.
- DatanodeInfo[] recoveryInfos;
- if (recoveryLocations.size() > 1) {
- if (recoveryLocations.size() != storages.length) {
- LOG.info("Skipped stale nodes for recovery : " +
- (storages.length - recoveryLocations.size()));
- }
- recoveryInfos =
- DatanodeStorageInfo.toDatanodeInfos(recoveryLocations);
- } else {
- // If too many replicas are stale, then choose all replicas to participate
- // in block recovery.
- recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages);
- }
- RecoveringBlock rBlock;
- if(truncateRecovery) {
- Block recoveryBlock = (copyOnTruncateRecovery) ? b :
- uc.getTruncateBlock();
- rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
- recoveryBlock);
- } else {
- rBlock = new RecoveringBlock(primaryBlock, recoveryInfos,
- uc.getBlockRecoveryId());
- }
- brCommand.add(rBlock);
- }
- return new DatanodeCommand[] { brCommand };
- }
+ if (nodeinfo == null || !nodeinfo.isAlive()) {
+ return new DatanodeCommand[]{RegisterCommand.REGISTER};
+ }
+ heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
+ cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
- final List<DatanodeCommand> cmds = new ArrayList<>();
- //check pending replication
- List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
- maxTransfers);
- if (pendingList != null) {
- cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
- pendingList));
- }
- // checking pending erasure coding tasks
- List<BlockECRecoveryInfo> pendingECList =
- nodeinfo.getErasureCodeCommand(maxTransfers);
- if (pendingECList != null) {
- cmds.add(new BlockECRecoveryCommand(DatanodeProtocol.DNA_ERASURE_CODING_RECOVERY,
- pendingECList));
- }
- //check block invalidation
- Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
- if (blks != null) {
- cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
- blockPoolId, blks));
- }
- boolean sendingCachingCommands = false;
- long nowMs = monotonicNow();
- if (shouldSendCachingCommands &&
- ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
- timeBetweenResendingCachingDirectivesMs)) {
- DatanodeCommand pendingCacheCommand =
- getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
- DatanodeProtocol.DNA_CACHE, blockPoolId);
- if (pendingCacheCommand != null) {
- cmds.add(pendingCacheCommand);
- sendingCachingCommands = true;
- }
- DatanodeCommand pendingUncacheCommand =
- getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
- DatanodeProtocol.DNA_UNCACHE, blockPoolId);
- if (pendingUncacheCommand != null) {
- cmds.add(pendingUncacheCommand);
- sendingCachingCommands = true;
- }
- if (sendingCachingCommands) {
- nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
- }
- }
+ // If we are in safemode, do not send back any recovery / replication
+ // requests. Don't even drain the existing queue of work.
+ if (namesystem.isInSafeMode()) {
+ return new DatanodeCommand[0];
+ }
- blockManager.addKeyUpdateCommand(cmds, nodeinfo);
+ // block recovery command
+ final BlockRecoveryCommand brCommand = getBlockRecoveryCommand(blockPoolId,
+ nodeinfo);
+ if (brCommand != null) {
+ return new DatanodeCommand[]{brCommand};
+ }
- // check for balancer bandwidth update
- if (nodeinfo.getBalancerBandwidth() > 0) {
- cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
- // set back to 0 to indicate that datanode has been sent the new value
- nodeinfo.setBalancerBandwidth(0);
- }
+ final List<DatanodeCommand> cmds = new ArrayList<>();
+ // check pending replication
+ List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
+ maxTransfers);
+ if (pendingList != null) {
+ cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
+ pendingList));
+ }
+ // check pending erasure coding tasks
+ List<BlockECRecoveryInfo> pendingECList = nodeinfo.getErasureCodeCommand(
+ maxTransfers);
+ if (pendingECList != null) {
+ cmds.add(new BlockECRecoveryCommand(DNA_ERASURE_CODING_RECOVERY,
+ pendingECList));
+ }
+ // check block invalidation
+ Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+ if (blks != null) {
+ cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId,
+ blks));
+ }
+ // cache commands
+ addCacheCommands(blockPoolId, nodeinfo, cmds);
+ // key update command
+ blockManager.addKeyUpdateCommand(cmds, nodeinfo);
+
+ // check for balancer bandwidth update
+ if (nodeinfo.getBalancerBandwidth() > 0) {
+ cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
+ // set back to 0 to indicate that datanode has been sent the new value
+ nodeinfo.setBalancerBandwidth(0);
+ }
- if (!cmds.isEmpty()) {
- return cmds.toArray(new DatanodeCommand[cmds.size()]);
- }
- }
+ if (!cmds.isEmpty()) {
+ return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
return new DatanodeCommand[0];
@@ -1486,14 +1482,13 @@ public class DatanodeManager {
*
* @param list The {@link CachedBlocksList}. This function
* clears the list.
- * @param datanode The datanode.
* @param action The action to perform in the command.
* @param poolId The block pool id.
* @return A DatanodeCommand to be sent back to the DN, or null if
* there is nothing to be done.
*/
- private DatanodeCommand getCacheCommand(CachedBlocksList list,
- DatanodeDescriptor datanode, int action, String poolId) {
+ private DatanodeCommand getCacheCommand(CachedBlocksList list, int action,
+ String poolId) {
int length = list.size();
if (length == 0) {
return null;
@@ -1501,9 +1496,7 @@ public class DatanodeManager {
// Read the existing cache commands.
long[] blockIds = new long[length];
int i = 0;
- for (Iterator<CachedBlock> iter = list.iterator();
- iter.hasNext(); ) {
- CachedBlock cachedBlock = iter.next();
+ for (CachedBlock cachedBlock : list) {
blockIds[i++] = cachedBlock.getBlockId();
}
return new BlockIdCommand(action, poolId, blockIds);
@@ -1524,7 +1517,7 @@ public class DatanodeManager {
* @throws IOException
*/
public void setBalancerBandwidth(long bandwidth) throws IOException {
- synchronized(datanodeMap) {
+ synchronized(this) {
for (DatanodeDescriptor nodeInfo : datanodeMap.values()) {
nodeInfo.setBalancerBandwidth(bandwidth);
}
@@ -1533,7 +1526,7 @@ public class DatanodeManager {
public void markAllDatanodesStale() {
LOG.info("Marking all datandoes as stale");
- synchronized (datanodeMap) {
+ synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
storage.markStaleAfterFailover();
@@ -1548,7 +1541,7 @@ public class DatanodeManager {
* recoveries, and replication requests.
*/
public void clearPendingQueues() {
- synchronized (datanodeMap) {
+ synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.clearBlockQueues();
}
@@ -1560,7 +1553,7 @@ public class DatanodeManager {
* know about.
*/
public void resetLastCachingDirectiveSentTime() {
- synchronized (datanodeMap) {
+ synchronized (this) {
for (DatanodeDescriptor dn : datanodeMap.values()) {
dn.setLastCachingDirectiveSentTimeMs(0L);
}
@@ -1573,9 +1566,11 @@ public class DatanodeManager {
}
public void clearPendingCachingCommands() {
- for (DatanodeDescriptor dn : datanodeMap.values()) {
- dn.getPendingCached().clear();
- dn.getPendingUncached().clear();
+ synchronized (this) {
+ for (DatanodeDescriptor dn : datanodeMap.values()) {
+ dn.getPendingCached().clear();
+ dn.getPendingUncached().clear();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d0369aa..9f23b32 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -27,6 +27,8 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/86026923/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index ba6f0e1..b25c5f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -418,9 +418,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
*/
private volatile boolean needRollbackFsImage;
- // Block pool ID used by this namenode
- private String blockPoolId;
-
final LeaseManager leaseManager = new LeaseManager(this);
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@@ -2348,12 +2345,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
ExtendedBlock getExtendedBlock(Block blk) {
- return new ExtendedBlock(blockPoolId, blk);
+ return new ExtendedBlock(getBlockPoolId(), blk);
}
void setBlockPoolId(String bpid) {
- blockPoolId = bpid;
- blockManager.setBlockPoolId(blockPoolId);
+ blockManager.setBlockPoolId(bpid);
}
/**
@@ -3489,11 +3485,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
* 2) Adjust usage stats for future block allocation
- *
- * If a substantial amount of time passed since the last datanode
- * heartbeat then request an immediate block report.
- *
- * @return an array of datanode commands
+ *
+ * If a substantial amount of time passed since the last datanode
+ * heartbeat then request an immediate block report.
+ *
+ * @return an array of datanode commands
* @throws IOException
*/
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
@@ -3507,7 +3503,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
- nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
+ nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
@@ -5371,7 +5367,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
@Override // NameNodeMXBean
public String getBlockPoolId() {
- return blockPoolId;
+ return getBlockManager().getBlockPoolId();
}
@Override // NameNodeMXBean
@@ -5960,7 +5956,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
void setRollingUpgradeInfo(boolean createdRollbackImages, long startTime) {
- rollingUpgradeInfo = new RollingUpgradeInfo(blockPoolId,
+ rollingUpgradeInfo = new RollingUpgradeInfo(getBlockPoolId(),
createdRollbackImages, startTime, 0L);
}