You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mi...@apache.org on 2016/10/18 00:46:08 UTC
[2/2] hadoop git commit: HDFS-9390. Block management for maintenance
states.
HDFS-9390. Block management for maintenance states.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b61fb267
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b61fb267
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b61fb267
Branch: refs/heads/trunk
Commit: b61fb267b92b2736920b4bd0c673d31e7632ebb9
Parents: f5d9235
Author: Ming Ma <mi...@apache.org>
Authored: Mon Oct 17 17:45:41 2016 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Mon Oct 17 17:45:41 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 53 +-
.../hadoop/hdfs/server/balancer/Dispatcher.java | 11 +-
.../server/blockmanagement/BlockManager.java | 249 ++++--
.../BlockPlacementPolicyDefault.java | 4 +-
.../CacheReplicationMonitor.java | 2 +-
.../blockmanagement/DatanodeDescriptor.java | 35 +-
.../server/blockmanagement/DatanodeManager.java | 47 +-
.../blockmanagement/DecommissionManager.java | 142 +++-
.../blockmanagement/ErasureCodingWork.java | 16 +-
.../blockmanagement/HeartbeatManager.java | 23 +-
.../blockmanagement/LowRedundancyBlocks.java | 47 +-
.../server/blockmanagement/NumberReplicas.java | 30 +-
.../blockmanagement/StorageTypeStats.java | 8 +-
.../hdfs/server/namenode/FSNamesystem.java | 9 +-
.../src/main/resources/hdfs-default.xml | 7 +
.../apache/hadoop/hdfs/AdminStatesBaseTest.java | 20 +-
.../apache/hadoop/hdfs/TestDecommission.java | 2 +-
.../hadoop/hdfs/TestMaintenanceState.java | 775 +++++++++++++++++--
.../blockmanagement/TestBlockManager.java | 8 +-
.../namenode/TestDecommissioningStatus.java | 57 +-
.../namenode/TestNamenodeCapacityReport.java | 78 +-
.../hadoop/hdfs/util/HostsFileWriter.java | 1 +
23 files changed, 1240 insertions(+), 389 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 10c0ad6..d54c109 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -220,6 +220,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
"dfs.namenode.reconstruction.pending.timeout-sec";
public static final int DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
+ public static final String DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
+ "dfs.namenode.maintenance.replication.min";
+ public static final int DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT
+ = 1;
+
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY =
HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 83870cf..23166e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -124,48 +124,57 @@ public class DFSUtil {
}
/**
- * Compartor for sorting DataNodeInfo[] based on decommissioned states.
- * Decommissioned nodes are moved to the end of the array on sorting with
- * this compartor.
+ * Comparator for sorting DataNodeInfo[] based on
+ * decommissioned and entering_maintenance states.
*/
- public static final Comparator<DatanodeInfo> DECOM_COMPARATOR =
- new Comparator<DatanodeInfo>() {
- @Override
- public int compare(DatanodeInfo a, DatanodeInfo b) {
- return a.isDecommissioned() == b.isDecommissioned() ? 0 :
- a.isDecommissioned() ? 1 : -1;
+ public static class ServiceComparator implements Comparator<DatanodeInfo> {
+ @Override
+ public int compare(DatanodeInfo a, DatanodeInfo b) {
+ // Decommissioned nodes will still be moved to the end of the list
+ if (a.isDecommissioned()) {
+ return b.isDecommissioned() ? 0 : 1;
+ } else if (b.isDecommissioned()) {
+ return -1;
}
- };
+ // ENTERING_MAINTENANCE nodes should be after live nodes.
+ if (a.isEnteringMaintenance()) {
+ return b.isEnteringMaintenance() ? 0 : 1;
+ } else if (b.isEnteringMaintenance()) {
+ return -1;
+ } else {
+ return 0;
+ }
+ }
+ }
/**
- * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
- * Decommissioned/stale nodes are moved to the end of the array on sorting
- * with this comparator.
- */
+ * Comparator for sorting DataNodeInfo[] based on
+ * stale, decommissioned and entering_maintenance states.
+ * Order: live -> stale -> entering_maintenance -> decommissioned
+ */
@InterfaceAudience.Private
- public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
+ public static class ServiceAndStaleComparator extends ServiceComparator {
private final long staleInterval;
/**
- * Constructor of DecomStaleComparator
+ * Constructor of ServiceAndStaleComparator
*
* @param interval
* The time interval for marking datanodes as stale is passed from
* outside, since the interval may be changed dynamically
*/
- public DecomStaleComparator(long interval) {
+ public ServiceAndStaleComparator(long interval) {
this.staleInterval = interval;
}
@Override
public int compare(DatanodeInfo a, DatanodeInfo b) {
- // Decommissioned nodes will still be moved to the end of the list
- if (a.isDecommissioned()) {
- return b.isDecommissioned() ? 0 : 1;
- } else if (b.isDecommissioned()) {
- return -1;
+ int ret = super.compare(a, b);
+ if (ret != 0) {
+ return ret;
}
+
// Stale nodes will be moved behind the normal nodes
boolean aStale = a.isStale(staleInterval);
boolean bStale = b.isStale(staleInterval);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
index aea0ae4..e5c5e53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
@@ -989,20 +989,17 @@ public class Dispatcher {
}
private boolean shouldIgnore(DatanodeInfo dn) {
- // ignore decommissioned nodes
- final boolean decommissioned = dn.isDecommissioned();
- // ignore decommissioning nodes
- final boolean decommissioning = dn.isDecommissionInProgress();
+ // ignore out-of-service nodes
+ final boolean outOfService = !dn.isInService();
// ignore nodes in exclude list
final boolean excluded = Util.isExcluded(excludedNodes, dn);
// ignore nodes not in the include list (if include list is not empty)
final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
- if (decommissioned || decommissioning || excluded || notIncluded) {
+ if (outOfService || excluded || notIncluded) {
if (LOG.isTraceEnabled()) {
LOG.trace("Excluding datanode " + dn
- + ": decommissioned=" + decommissioned
- + ", decommissioning=" + decommissioning
+ + ": outOfService=" + outOfService
+ ", excluded=" + excluded
+ ", notIncluded=" + notIncluded);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7b13add..03bdb7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -126,6 +126,29 @@ import org.slf4j.LoggerFactory;
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
+ * For block state management, it tries to maintain the safety
+ * property of "# of live replicas == # of expected redundancy" under
+ * any events such as decommission, namenode failover, datanode failure.
+ *
+ * The motivation of maintenance mode is to allow admins quickly repair nodes
+ * without paying the cost of decommission. Thus with maintenance mode,
+ * # of live replicas doesn't have to be equal to # of expected redundancy.
+ * If any of the replica is in maintenance mode, the safety property
+ * is extended as follows. These property still apply for the case of zero
+ * maintenance replicas, thus we can use these safe property for all scenarios.
+ * a. # of live replicas >= # of min replication for maintenance.
+ * b. # of live replicas <= # of expected redundancy.
+ * c. # of live replicas and maintenance replicas >= # of expected redundancy.
+ *
+ * For regular replication, # of min live replicas for maintenance is determined
+ * by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <=
+ * DFS_NAMENODE_REPLICATION_MIN_KEY.
+ * For erasure encoding, # of min live replicas for maintenance is
+ * BlockInfoStriped#getRealDataBlockNum.
+ *
+ * Another safety property is to satisfy the block placement policy. While the
+ * policy is configurable, the replicas the policy is applied to are the live
+ * replicas + maintenance replicas.
*/
@InterfaceAudience.Private
public class BlockManager implements BlockStatsMXBean {
@@ -341,6 +364,11 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager;
+ /** Minimum live replicas needed for the datanode to be transitioned
+ * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
+ */
+ private final short minReplicationToBeInMaintenance;
+
public BlockManager(final Namesystem namesystem, boolean haEnabled,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -373,13 +401,13 @@ public class BlockManager implements BlockStatsMXBean {
this.maxCorruptFilesReturned = conf.getInt(
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
- this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
- DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+ this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+ DFSConfigKeys.DFS_REPLICATION_DEFAULT);
- final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
- DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
+ final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
+ DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
- DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minR <= 0)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
@@ -407,7 +435,7 @@ public class BlockManager implements BlockStatsMXBean {
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
- this.replicationRecheckInterval =
+ this.replicationRecheckInterval =
conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
TimeUnit.SECONDS) * 1000L;
@@ -428,7 +456,7 @@ public class BlockManager implements BlockStatsMXBean {
this.encryptDataTransfer =
conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-
+
this.maxNumBlocksToLog =
conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -438,6 +466,25 @@ public class BlockManager implements BlockStatsMXBean {
this.getBlocksMinBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
+
+ final int minMaintenanceR = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
+ DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
+
+ if (minMaintenanceR < 0) {
+ throw new IOException("Unexpected configuration parameters: "
+ + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+ + " = " + minMaintenanceR + " < 0");
+ }
+ if (minMaintenanceR > minR) {
+ throw new IOException("Unexpected configuration parameters: "
+ + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+ + " = " + minMaintenanceR + " > "
+ + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ + " = " + minR);
+ }
+ this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
+
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -668,7 +715,7 @@ public class BlockManager implements BlockStatsMXBean {
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
}
-
+
/**
* Dump the metadata for the given block in a human-readable
* form.
@@ -697,12 +744,12 @@ public class BlockManager implements BlockStatsMXBean {
out.print(fileName + ": ");
}
// l: == live:, d: == decommissioned c: == corrupt e: == excess
- out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
+ out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
" (replicas:" +
" l: " + numReplicas.liveReplicas() +
" d: " + numReplicas.decommissionedAndDecommissioning() +
" c: " + numReplicas.corruptReplicas() +
- " e: " + numReplicas.excessReplicas() + ") ");
+ " e: " + numReplicas.excessReplicas() + ") ");
Collection<DatanodeDescriptor> corruptNodes =
corruptReplicas.getNodes(block);
@@ -750,6 +797,18 @@ public class BlockManager implements BlockStatsMXBean {
}
}
+ public short getMinReplicationToBeInMaintenance() {
+ return minReplicationToBeInMaintenance;
+ }
+
+ private short getMinMaintenanceStorageNum(BlockInfo block) {
+ if (block.isStriped()) {
+ return ((BlockInfoStriped) block).getRealDataBlockNum();
+ } else {
+ return minReplicationToBeInMaintenance;
+ }
+ }
+
public boolean hasMinStorage(BlockInfo block) {
return countNodes(block).liveReplicas() >= getMinStorageNum(block);
}
@@ -942,7 +1001,7 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas replicas = countNodes(lastBlock);
neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
replicas.readOnlyReplicas(),
- replicas.decommissionedAndDecommissioning(), getRedundancy(lastBlock));
+ replicas.outOfServiceReplicas(), getExpectedRedundancyNum(lastBlock));
pendingReconstruction.remove(lastBlock);
// remove this block from the list of pending blocks to be deleted.
@@ -1078,7 +1137,8 @@ public class BlockManager implements BlockStatsMXBean {
} else {
isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
}
- final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
+ int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
+ numMachines -= numReplicas.maintenanceNotForReadReplicas();
DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
int j = 0, i = 0;
@@ -1086,11 +1146,17 @@ public class BlockManager implements BlockStatsMXBean {
final boolean noCorrupt = (numCorruptReplicas == 0);
for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
if (storage.getState() != State.FAILED) {
+ final DatanodeDescriptor d = storage.getDatanodeDescriptor();
+ // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
+ if (d.isInMaintenance()
+ || (d.isEnteringMaintenance() && !d.isAlive())) {
+ continue;
+ }
+
if (noCorrupt) {
machines[j++] = storage;
i = setBlockIndices(blk, blockIndices, i, storage);
} else {
- final DatanodeDescriptor d = storage.getDatanodeDescriptor();
final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
if (isCorrupt || !replicaCorrupt) {
machines[j++] = storage;
@@ -1106,7 +1172,7 @@ public class BlockManager implements BlockStatsMXBean {
}
assert j == machines.length :
- "isCorrupt: " + isCorrupt +
+ "isCorrupt: " + isCorrupt +
" numMachines: " + numMachines +
" numNodes: " + numNodes +
" numCorrupt: " + numCorruptNodes +
@@ -1700,8 +1766,11 @@ public class BlockManager implements BlockStatsMXBean {
return scheduledWork;
}
+ // Check if the number of live + pending replicas satisfies
+ // the expected redundancy.
boolean hasEnoughEffectiveReplicas(BlockInfo block,
- NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+ NumberReplicas numReplicas, int pendingReplicaNum) {
+ int required = getExpectedLiveRedundancyNum(block, numReplicas);
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
return (numEffectiveReplicas >= required) &&
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
@@ -1716,8 +1785,6 @@ public class BlockManager implements BlockStatsMXBean {
return null;
}
- short requiredRedundancy = getExpectedRedundancyNum(block);
-
// get a source data-node
List<DatanodeDescriptor> containingNodes = new ArrayList<>();
List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
@@ -1726,6 +1793,8 @@ public class BlockManager implements BlockStatsMXBean {
final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
containingNodes, liveReplicaNodes, numReplicas,
liveBlockIndices, priority);
+ short requiredRedundancy = getExpectedLiveRedundancyNum(block,
+ numReplicas);
if(srcNodes == null || srcNodes.length == 0) {
// block can not be reconstructed from any node
LOG.debug("Block " + block + " cannot be reconstructed " +
@@ -1738,8 +1807,7 @@ public class BlockManager implements BlockStatsMXBean {
assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
int pendingNum = pendingReconstruction.getNumReplicas(block);
- if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
- requiredRedundancy)) {
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
neededReconstruction.remove(block, priority);
blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
" it has enough replicas", block);
@@ -1763,9 +1831,11 @@ public class BlockManager implements BlockStatsMXBean {
// should reconstruct all the internal blocks before scheduling
// replication task for decommissioning node(s).
- if (additionalReplRequired - numReplicas.decommissioning() > 0) {
- additionalReplRequired = additionalReplRequired
- - numReplicas.decommissioning();
+ if (additionalReplRequired - numReplicas.decommissioning() -
+ numReplicas.liveEnteringMaintenanceReplicas() > 0) {
+ additionalReplRequired = additionalReplRequired -
+ numReplicas.decommissioning() -
+ numReplicas.liveEnteringMaintenanceReplicas();
}
byte[] indices = new byte[liveBlockIndices.size()];
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
@@ -1807,11 +1877,11 @@ public class BlockManager implements BlockStatsMXBean {
}
// do not schedule more if enough replicas is already pending
- final short requiredRedundancy = getExpectedRedundancyNum(block);
NumberReplicas numReplicas = countNodes(block);
+ final short requiredRedundancy =
+ getExpectedLiveRedundancyNum(block, numReplicas);
final int pendingNum = pendingReconstruction.getNumReplicas(block);
- if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
- requiredRedundancy)) {
+ if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
neededReconstruction.remove(block, priority);
rw.resetTargets();
blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@@ -1880,7 +1950,7 @@ public class BlockManager implements BlockStatsMXBean {
* @throws IOException
* if the number of targets < minimum replication.
* @see BlockPlacementPolicy#chooseTarget(String, int, Node,
- * Set, long, List, BlockStoragePolicy)
+ * Set, long, List, BlockStoragePolicy, EnumSet)
*/
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final Node client,
@@ -1987,13 +2057,15 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
- // never use already decommissioned nodes or unknown state replicas
- if (state == null || state == StoredReplicaState.DECOMMISSIONED) {
+ // never use already decommissioned nodes, maintenance node not
+ // suitable for read or unknown state replicas.
+ if (state == null || state == StoredReplicaState.DECOMMISSIONED
+ || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
continue;
}
if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
- && !node.isDecommissionInProgress()
+ && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
continue; // already reached replication limit
}
@@ -2045,10 +2117,10 @@ public class BlockManager implements BlockStatsMXBean {
continue;
}
NumberReplicas num = countNodes(timedOutItems[i]);
- if (isNeededReconstruction(bi, num.liveReplicas())) {
+ if (isNeededReconstruction(bi, num)) {
neededReconstruction.add(bi, num.liveReplicas(),
- num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
- getRedundancy(bi));
+ num.readOnlyReplicas(), num.outOfServiceReplicas(),
+ getExpectedRedundancyNum(bi));
}
}
} finally {
@@ -3014,10 +3086,9 @@ public class BlockManager implements BlockStatsMXBean {
// handle low redundancy/extra redundancy
short fileRedundancy = getExpectedRedundancyNum(storedBlock);
- if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
+ if (!isNeededReconstruction(storedBlock, num, pendingNum)) {
neededReconstruction.remove(storedBlock, numCurrentReplica,
- num.readOnlyReplicas(),
- num.decommissionedAndDecommissioning(), fileRedundancy);
+ num.readOnlyReplicas(), num.outOfServiceReplicas(), fileRedundancy);
} else {
updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
}
@@ -3040,6 +3111,10 @@ public class BlockManager implements BlockStatsMXBean {
return storedBlock;
}
+ // If there is any maintenance replica, we don't have to restore
+ // the condition of live + maintenance == expected. We allow
+ // live + maintenance >= expected. The extra redundancy will be removed
+ // when the maintenance node changes to live.
private boolean shouldProcessExtraRedundancy(NumberReplicas num,
int expectedNum) {
final int numCurrent = num.liveReplicas();
@@ -3255,9 +3330,9 @@ public class BlockManager implements BlockStatsMXBean {
NumberReplicas num = countNodes(block);
final int numCurrentReplica = num.liveReplicas();
// add to low redundancy queue if need to be
- if (isNeededReconstruction(block, numCurrentReplica)) {
+ if (isNeededReconstruction(block, num)) {
if (neededReconstruction.add(block, numCurrentReplica,
- num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+ num.readOnlyReplicas(), num.outOfServiceReplicas(),
expectedRedundancy)) {
return MisReplicationResult.UNDER_REPLICATED;
}
@@ -3290,9 +3365,9 @@ public class BlockManager implements BlockStatsMXBean {
// update neededReconstruction priority queues
b.setReplication(newRepl);
+ NumberReplicas num = countNodes(b);
updateNeededReconstructions(b, 0, newRepl - oldRepl);
-
- if (oldRepl > newRepl) {
+ if (shouldProcessExtraRedundancy(num, newRepl)) {
processExtraRedundancyBlock(b, newRepl, null, null);
}
}
@@ -3318,14 +3393,14 @@ public class BlockManager implements BlockStatsMXBean {
}
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
if (storage.areBlockContentsStale()) {
- LOG.trace("BLOCK* processOverReplicatedBlock: Postponing {}"
+ LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
+ " since storage {} does not yet have up-to-date information.",
block, storage);
postponeBlock(block);
return;
}
if (!isExcess(cur, block)) {
- if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+ if (cur.isInService()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
nonExcess.add(storage);
@@ -3766,7 +3841,7 @@ public class BlockManager implements BlockStatsMXBean {
return countNodes(b, false);
}
- private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
+ NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
NumberReplicas numberReplicas = new NumberReplicas();
Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
if (b.isStriped()) {
@@ -3797,6 +3872,12 @@ public class BlockManager implements BlockStatsMXBean {
s = StoredReplicaState.DECOMMISSIONING;
} else if (node.isDecommissioned()) {
s = StoredReplicaState.DECOMMISSIONED;
+ } else if (node.isMaintenance()) {
+ if (node.isInMaintenance() || !node.isAlive()) {
+ s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
+ } else {
+ s = StoredReplicaState.MAINTENANCE_FOR_READ;
+ }
} else if (isExcess(node, b)) {
s = StoredReplicaState.EXCESS;
} else {
@@ -3868,11 +3949,11 @@ public class BlockManager implements BlockStatsMXBean {
}
/**
- * On stopping decommission, check if the node has excess replicas.
+ * On putting the node in service, check if the node has excess replicas.
* If there are any excess replicas, call processExtraRedundancyBlock().
* Process extra redundancy blocks only when active NN is out of safe mode.
*/
- void processExtraRedundancyBlocksOnReCommission(
+ void processExtraRedundancyBlocksOnInService(
final DatanodeDescriptor srcNode) {
if (!isPopulatingReplQueues()) {
return;
@@ -3881,7 +3962,7 @@ public class BlockManager implements BlockStatsMXBean {
int numExtraRedundancy = 0;
while(it.hasNext()) {
final BlockInfo block = it.next();
- int expectedReplication = this.getRedundancy(block);
+ int expectedReplication = this.getExpectedRedundancyNum(block);
NumberReplicas num = countNodes(block);
if (shouldProcessExtraRedundancy(num, expectedReplication)) {
// extra redundancy block
@@ -3891,14 +3972,15 @@ public class BlockManager implements BlockStatsMXBean {
}
}
LOG.info("Invalidated " + numExtraRedundancy
- + " extra redundancy blocks on " + srcNode + " during recommissioning");
+ + " extra redundancy blocks on " + srcNode + " after it is in service");
}
/**
- * Returns whether a node can be safely decommissioned based on its
- * liveness. Dead nodes cannot always be safely decommissioned.
+ * Returns whether a node can be safely decommissioned or in maintenance
+ * based on its liveness. Dead nodes cannot always be safely decommissioned
+ * or in maintenance.
*/
- boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
+ boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) {
if (!node.checkBlockReportReceived()) {
LOG.info("Node {} hasn't sent its first block report.", node);
return false;
@@ -3912,17 +3994,18 @@ public class BlockManager implements BlockStatsMXBean {
if (pendingReconstructionBlocksCount == 0 &&
lowRedundancyBlocksCount == 0) {
LOG.info("Node {} is dead and there are no low redundancy" +
- " blocks or blocks pending reconstruction. Safe to decommission.",
- node);
+ " blocks or blocks pending reconstruction. Safe to decommission or",
+ " put in maintenance.", node);
return true;
}
LOG.warn("Node {} is dead " +
- "while decommission is in progress. Cannot be safely " +
- "decommissioned since there is risk of reduced " +
- "data durability or data loss. Either restart the failed node or" +
- " force decommissioning by removing, calling refreshNodes, " +
- "then re-adding to the excludes files.", node);
+ "while in {}. Cannot be safely " +
+ "decommissioned or be in maintenance since there is risk of reduced " +
+ "data durability or data loss. Either restart the failed node or " +
+ "force decommissioning or maintenance by removing, calling " +
+ "refreshNodes, then re-adding to the excludes or host config files.",
+ node, node.getAdminState());
return false;
}
@@ -3990,17 +4073,16 @@ public class BlockManager implements BlockStatsMXBean {
}
NumberReplicas repl = countNodes(block);
int pendingNum = pendingReconstruction.getNumReplicas(block);
- int curExpectedReplicas = getRedundancy(block);
- if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
- curExpectedReplicas)) {
+ int curExpectedReplicas = getExpectedRedundancyNum(block);
+ if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) {
neededReconstruction.update(block, repl.liveReplicas() + pendingNum,
- repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
+ repl.readOnlyReplicas(), repl.outOfServiceReplicas(),
curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
} else {
int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
- repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
+ repl.outOfServiceReplicas(), oldExpectedReplicas);
}
} finally {
namesystem.writeUnlock();
@@ -4018,24 +4100,15 @@ public class BlockManager implements BlockStatsMXBean {
short expected = getExpectedRedundancyNum(block);
final NumberReplicas n = countNodes(block);
final int pending = pendingReconstruction.getNumReplicas(block);
- if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
+ if (!hasEnoughEffectiveReplicas(block, n, pending)) {
neededReconstruction.add(block, n.liveReplicas() + pending,
- n.readOnlyReplicas(),
- n.decommissionedAndDecommissioning(), expected);
+ n.readOnlyReplicas(), n.outOfServiceReplicas(), expected);
} else if (shouldProcessExtraRedundancy(n, expected)) {
processExtraRedundancyBlock(block, expected, null, null);
}
}
}
- /**
- * @return 0 if the block is not found;
- * otherwise, return the replication factor of the block.
- */
- private int getRedundancy(BlockInfo block) {
- return getExpectedRedundancyNum(block);
- }
-
/**
* Get blocks to invalidate for <i>nodeId</i>
* in {@link #invalidateBlocks}.
@@ -4088,6 +4161,8 @@ public class BlockManager implements BlockStatsMXBean {
.getNodes(storedBlock);
for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+ // Nodes under maintenance should be counted as valid replicas from
+ // rack policy point of view.
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
&& ((corruptNodes == null) || !corruptNodes.contains(cur))) {
liveNodes.add(cur);
@@ -4102,14 +4177,36 @@ public class BlockManager implements BlockStatsMXBean {
.isPlacementPolicySatisfied();
}
+ boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,
+ NumberReplicas numberReplicas) {
+ return storedBlock.isComplete() && (numberReplicas.liveReplicas() <
+ getMinMaintenanceStorageNum(storedBlock) ||
+ !isPlacementPolicySatisfied(storedBlock));
+ }
+
+ boolean isNeededReconstruction(BlockInfo storedBlock,
+ NumberReplicas numberReplicas) {
+ return isNeededReconstruction(storedBlock, numberReplicas, 0);
+ }
+
/**
* A block needs reconstruction if the number of redundancies is less than
* expected or if it does not have enough racks.
*/
- boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
- int expected = getExpectedRedundancyNum(storedBlock);
- return storedBlock.isComplete()
- && (current < expected || !isPlacementPolicySatisfied(storedBlock));
+ boolean isNeededReconstruction(BlockInfo storedBlock,
+ NumberReplicas numberReplicas, int pending) {
+ return storedBlock.isComplete() &&
+ !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
+ }
+
+ // Exclude maintenance, but make sure it has minimal live replicas
+ // to satisfy the maintenance requirement.
+ public short getExpectedLiveRedundancyNum(BlockInfo block,
+ NumberReplicas numberReplicas) {
+ final short expectedRedundancy = getExpectedRedundancyNum(block);
+ return (short)Math.max(expectedRedundancy -
+ numberReplicas.maintenanceReplicas(),
+ getMinMaintenanceStorageNum(block));
}
public short getExpectedRedundancyNum(BlockInfo block) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 3958c73..0390546 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -833,8 +833,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
List<DatanodeStorageInfo> results,
boolean avoidStaleNodes) {
// check if the node is (being) decommissioned
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- logNodeIsNotChosen(node, "the node is (being) decommissioned ");
+ if (!node.isInService()) {
+ logNodeIsNotChosen(node, "the node isn't in service.");
return false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index ca8d72a..8563cf3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -682,7 +682,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
if (datanode == null) {
continue;
}
- if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+ if (!datanode.isInService()) {
continue;
}
if (corrupt != null && corrupt.contains(datanode)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 6d163ec..f7da52a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -146,8 +146,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
// Stores status of decommissioning.
// If node is not decommissioning, do not use this object for anything.
- public final DecommissioningStatus decommissioningStatus =
- new DecommissioningStatus();
+ private final LeavingServiceStatus leavingServiceStatus =
+ new LeavingServiceStatus();
private final Map<String, DatanodeStorageInfo> storageMap =
new HashMap<>();
@@ -276,6 +276,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.needKeyUpdate = needKeyUpdate;
}
+ public LeavingServiceStatus getLeavingServiceStatus() {
+ return leavingServiceStatus;
+ }
+
@VisibleForTesting
public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {
@@ -729,51 +733,54 @@ public class DatanodeDescriptor extends DatanodeInfo {
return (this == obj) || super.equals(obj);
}
- /** Decommissioning status */
- public class DecommissioningStatus {
+ /** Leaving service status. */
+ public class LeavingServiceStatus {
private int underReplicatedBlocks;
- private int decommissionOnlyReplicas;
+ private int outOfServiceOnlyReplicas;
private int underReplicatedInOpenFiles;
private long startTime;
synchronized void set(int underRep,
int onlyRep, int underConstruction) {
- if (!isDecommissionInProgress()) {
+ if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return;
}
underReplicatedBlocks = underRep;
- decommissionOnlyReplicas = onlyRep;
+ outOfServiceOnlyReplicas = onlyRep;
underReplicatedInOpenFiles = underConstruction;
}
/** @return the number of under-replicated blocks */
public synchronized int getUnderReplicatedBlocks() {
- if (!isDecommissionInProgress()) {
+ if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return underReplicatedBlocks;
}
- /** @return the number of decommission-only replicas */
- public synchronized int getDecommissionOnlyReplicas() {
- if (!isDecommissionInProgress()) {
+ /** @return the number of blocks with out-of-service-only replicas */
+ public synchronized int getOutOfServiceOnlyReplicas() {
+ if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
- return decommissionOnlyReplicas;
+ return outOfServiceOnlyReplicas;
}
/** @return the number of under-replicated blocks in open files */
public synchronized int getUnderReplicatedInOpenFiles() {
- if (!isDecommissionInProgress()) {
+ if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return underReplicatedInOpenFiles;
}
/** Set start time */
public synchronized void setStartTime(long time) {
+ if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
+ return;
+ }
startTime = time;
}
/** @return start time */
public synchronized long getStartTime() {
- if (!isDecommissionInProgress()) {
+ if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
return 0;
}
return startTime;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 2d6547f..1a47835 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -388,8 +388,8 @@ public class DatanodeManager {
public void sortLocatedBlocks(final String targetHost,
final List<LocatedBlock> locatedBlocks) {
Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
- new DFSUtil.DecomStaleComparator(staleInterval) :
- DFSUtil.DECOM_COMPARATOR;
+ new DFSUtil.ServiceAndStaleComparator(staleInterval) :
+ new DFSUtil.ServiceComparator();
// sort located block
for (LocatedBlock lb : locatedBlocks) {
if (lb.isStriped()) {
@@ -632,9 +632,20 @@ public class DatanodeManager {
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
+ removeDatanode(nodeInfo, true);
+ }
+
+ /**
+ * Remove a datanode descriptor.
+ * @param nodeInfo datanode descriptor.
+ */
+ private void removeDatanode(DatanodeDescriptor nodeInfo,
+ boolean removeBlocksFromBlocksMap) {
assert namesystem.hasWriteLock();
heartbeatManager.removeDatanode(nodeInfo);
- blockManager.removeBlocksAssociatedTo(nodeInfo);
+ if (removeBlocksFromBlocksMap) {
+ blockManager.removeBlocksAssociatedTo(nodeInfo);
+ }
networktopology.remove(nodeInfo);
decrementVersionCount(nodeInfo.getSoftwareVersion());
blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
@@ -655,7 +666,7 @@ public class DatanodeManager {
try {
final DatanodeDescriptor descriptor = getDatanode(node);
if (descriptor != null) {
- removeDatanode(descriptor);
+ removeDatanode(descriptor, true);
} else {
NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
+ node + " does not exist");
@@ -666,7 +677,8 @@ public class DatanodeManager {
}
/** Remove a dead datanode. */
- void removeDeadDatanode(final DatanodeID nodeID) {
+ void removeDeadDatanode(final DatanodeID nodeID,
+ boolean removeBlocksFromBlockMap) {
DatanodeDescriptor d;
try {
d = getDatanode(nodeID);
@@ -675,8 +687,9 @@ public class DatanodeManager {
}
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
- "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
- removeDatanode(d);
+ "BLOCK* removeDeadDatanode: lost heartbeat from " + d
+ + ", removeBlocksFromBlockMap " + removeBlocksFromBlockMap);
+ removeDatanode(d, removeBlocksFromBlockMap);
}
}
@@ -1112,10 +1125,16 @@ public class DatanodeManager {
}
/**
- * 1. Added to hosts --> no further work needed here.
- * 2. Removed from hosts --> mark AdminState as decommissioned.
- * 3. Added to exclude --> start decommission.
- * 4. Removed from exclude --> stop decommission.
+ * Reload datanode membership and the desired admin operations from
+ * host files. If a node isn't allowed, hostConfigManager.isIncluded returns
+ * false and the node can't be used.
+ * If a node is allowed and the desired admin operation is defined,
+ * it will transition to the desired admin state.
+ * If a node is allowed and upgrade domain is defined,
+ * the upgrade domain will be set on the node.
+ * To use maintenance mode or upgrade domain, set
+ * DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY to
+ * CombinedHostFileManager.class.
*/
private void refreshDatanodes() {
final Map<String, DatanodeDescriptor> copy;
@@ -1125,17 +1144,17 @@ public class DatanodeManager {
for (DatanodeDescriptor node : copy.values()) {
// Check if not include.
if (!hostConfigManager.isIncluded(node)) {
- node.setDisallowed(true); // case 2.
+ node.setDisallowed(true);
} else {
long maintenanceExpireTimeInMS =
hostConfigManager.getMaintenanceExpirationTimeInMS(node);
if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
} else if (hostConfigManager.isExcluded(node)) {
- decomManager.startDecommission(node); // case 3.
+ decomManager.startDecommission(node);
} else {
decomManager.stopMaintenance(node);
- decomManager.stopDecommission(node); // case 4.
+ decomManager.stopDecommission(node);
}
}
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 87b36da..b1cfd78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -201,7 +201,7 @@ public class DecommissionManager {
LOG.info("Starting decommission of {} {} with {} blocks",
node, storage, storage.numBlocks());
}
- node.decommissioningStatus.setStartTime(monotonicNow());
+ node.getLeavingServiceStatus().setStartTime(monotonicNow());
pendingNodes.add(node);
}
} else {
@@ -222,7 +222,7 @@ public class DecommissionManager {
// extra redundancy blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
- blockManager.processExtraRedundancyBlocksOnReCommission(node);
+ blockManager.processExtraRedundancyBlocksOnInService(node);
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
@@ -246,6 +246,16 @@ public class DecommissionManager {
if (!node.isMaintenance()) {
// Update DN stats maintained by HeartbeatManager
hbManager.startMaintenance(node);
+ // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
+ if (node.isEnteringMaintenance()) {
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ LOG.info("Starting maintenance of {} {} with {} blocks",
+ node, storage, storage.numBlocks());
+ }
+ node.getLeavingServiceStatus().setStartTime(monotonicNow());
+ }
+ // Track the node regardless whether it is ENTERING_MAINTENANCE or
+ // IN_MAINTENANCE to support maintenance expiration.
pendingNodes.add(node);
} else {
LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
@@ -264,8 +274,34 @@ public class DecommissionManager {
// Update DN stats maintained by HeartbeatManager
hbManager.stopMaintenance(node);
- // TODO HDFS-9390 remove replicas from block maps
- // or handle over replicated blocks.
+ // extra redundancy blocks will be detected and processed when
+ // the dead node comes back and send in its full block report.
+ if (!node.isAlive()) {
+ // The node became dead when it was in maintenance, at which point
+ // the replicas weren't removed from block maps.
+ // When the node leaves maintenance, the replicas should be removed
+ // from the block maps to trigger the necessary replication to
+ // maintain the safety property of "# of live replicas + maintenance
+ // replicas" >= the expected redundancy.
+ blockManager.removeBlocksAssociatedTo(node);
+ } else {
+ // Even though putting nodes in maintenance node doesn't cause live
+ // replicas to match expected replication factor, it is still possible
+ // to have over replicated when the node leaves maintenance node.
+ // First scenario:
+ // a. Node became dead when it is at AdminStates.NORMAL, thus
+ // block is replicated so that 3 replicas exist on other nodes.
+ // b. Admins put the dead node into maintenance mode and then
+ // have the node rejoin the cluster.
+ // c. Take the node out of maintenance mode.
+ // Second scenario:
+ // a. With replication factor 3, set one replica to maintenance node,
+ // thus block has 1 maintenance replica and 2 live replicas.
+ // b. Change the replication factor to 2. The block will still have
+ // 1 maintenance replica and 2 live replicas.
+ // c. Take the node out of maintenance mode.
+ blockManager.processExtraRedundancyBlocksOnInService(node);
+ }
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
@@ -281,6 +317,11 @@ public class DecommissionManager {
LOG.info("Decommissioning complete for node {}", dn);
}
+ private void setInMaintenance(DatanodeDescriptor dn) {
+ dn.setInMaintenance();
+ LOG.info("Node {} has entered maintenance mode.", dn);
+ }
+
/**
* Checks whether a block is sufficiently replicated/stored for
* decommissioning. For replicated blocks or striped blocks, full-strength
@@ -288,20 +329,21 @@ public class DecommissionManager {
* @return true if sufficient, else false.
*/
private boolean isSufficient(BlockInfo block, BlockCollection bc,
- NumberReplicas numberReplicas) {
- final int numExpected = blockManager.getExpectedRedundancyNum(block);
- final int numLive = numberReplicas.liveReplicas();
- if (numLive >= numExpected
- && blockManager.isPlacementPolicySatisfied(block)) {
+ NumberReplicas numberReplicas, boolean isDecommission) {
+ if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
// Block has enough replica, skip
LOG.trace("Block {} does not need replication.", block);
return true;
}
+ final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
+ numberReplicas);
+ final int numLive = numberReplicas.liveReplicas();
+
// Block is under-replicated
- LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
+ LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
numLive);
- if (numExpected > numLive) {
+ if (isDecommission && numExpected > numLive) {
if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
// Can decom a UC block as long as there will still be minReplicas
if (blockManager.hasMinStorage(block, numLive)) {
@@ -346,11 +388,16 @@ public class DecommissionManager {
+ ", corrupt replicas: " + num.corruptReplicas()
+ ", decommissioned replicas: " + num.decommissioned()
+ ", decommissioning replicas: " + num.decommissioning()
+ + ", maintenance replicas: " + num.maintenanceReplicas()
+ + ", live entering maintenance replicas: "
+ + num.liveEnteringMaintenanceReplicas()
+ ", excess replicas: " + num.excessReplicas()
+ ", Is Open File: " + bc.isUnderConstruction()
+ ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ srcNode + ", Is current datanode decommissioning: "
- + srcNode.isDecommissionInProgress());
+ + srcNode.isDecommissionInProgress() +
+ ", Is current datanode entering maintenance: "
+ + srcNode.isEnteringMaintenance());
}
@VisibleForTesting
@@ -424,7 +471,7 @@ public class DecommissionManager {
numBlocksChecked = 0;
numBlocksCheckedPerLock = 0;
numNodesChecked = 0;
- // Check decom progress
+ // Check decommission or maintenance progress.
namesystem.writeLock();
try {
processPendingNodes();
@@ -464,15 +511,14 @@ public class DecommissionManager {
final DatanodeDescriptor dn = entry.getKey();
AbstractList<BlockInfo> blocks = entry.getValue();
boolean fullScan = false;
- if (dn.isMaintenance()) {
- // TODO HDFS-9390 make sure blocks are minimally replicated
- // before transitioning the node to IN_MAINTENANCE state.
-
+ if (dn.isMaintenance() && dn.maintenanceExpired()) {
// If maintenance expires, stop tracking it.
- if (dn.maintenanceExpired()) {
- stopMaintenance(dn);
- toRemove.add(dn);
- }
+ stopMaintenance(dn);
+ toRemove.add(dn);
+ continue;
+ }
+ if (dn.isInMaintenance()) {
+ // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
continue;
}
if (blocks == null) {
@@ -487,7 +533,7 @@ public class DecommissionManager {
} else {
// This is a known datanode, check if its # of insufficiently
// replicated blocks has dropped to zero and if it can be decommed
- LOG.debug("Processing decommission-in-progress node {}", dn);
+ LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
pruneReliableBlocks(dn, blocks);
}
if (blocks.size() == 0) {
@@ -506,22 +552,31 @@ public class DecommissionManager {
// If the full scan is clean AND the node liveness is okay,
// we can finally mark as decommissioned.
final boolean isHealthy =
- blockManager.isNodeHealthyForDecommission(dn);
+ blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
if (blocks.size() == 0 && isHealthy) {
- setDecommissioned(dn);
- toRemove.add(dn);
+ if (dn.isDecommissionInProgress()) {
+ setDecommissioned(dn);
+ toRemove.add(dn);
+ } else if (dn.isEnteringMaintenance()) {
+ // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
+ // to track maintenance expiration.
+ setInMaintenance(dn);
+ } else {
+ Preconditions.checkState(false,
+ "A node is in an invalid state!");
+ }
LOG.debug("Node {} is sufficiently replicated and healthy, "
- + "marked as decommissioned.", dn);
+ + "marked as {}.", dn.getAdminState());
} else {
LOG.debug("Node {} {} healthy."
+ " It needs to replicate {} more blocks."
- + " Decommissioning is still in progress.",
- dn, isHealthy? "is": "isn't", blocks.size());
+ + " {} is still in progress.", dn,
+ isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
}
} else {
LOG.debug("Node {} still has {} blocks to replicate "
- + "before it is a candidate to finish decommissioning.",
- dn, blocks.size());
+ + "before it is a candidate to finish {}.",
+ dn, blocks.size(), dn.getAdminState());
}
iterkey = dn;
}
@@ -539,7 +594,7 @@ public class DecommissionManager {
*/
private void pruneReliableBlocks(final DatanodeDescriptor datanode,
AbstractList<BlockInfo> blocks) {
- processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
+ processBlocksInternal(datanode, blocks.iterator(), null, true);
}
/**
@@ -554,7 +609,7 @@ public class DecommissionManager {
private AbstractList<BlockInfo> handleInsufficientlyStored(
final DatanodeDescriptor datanode) {
AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
- processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
+ processBlocksInternal(datanode, datanode.getBlockIterator(),
insufficient, false);
return insufficient;
}
@@ -573,14 +628,14 @@ public class DecommissionManager {
* @param pruneReliableBlocks whether to remove blocks reliable
* enough from the iterator
*/
- private void processBlocksForDecomInternal(
+ private void processBlocksInternal(
final DatanodeDescriptor datanode,
final Iterator<BlockInfo> it,
final List<BlockInfo> insufficientList,
boolean pruneReliableBlocks) {
boolean firstReplicationLog = true;
int lowRedundancyBlocks = 0;
- int decommissionOnlyReplicas = 0;
+ int outOfServiceOnlyReplicas = 0;
int lowRedundancyInOpenFiles = 0;
while (it.hasNext()) {
if (insufficientList == null
@@ -626,21 +681,25 @@ public class DecommissionManager {
// Schedule low redundancy blocks for reconstruction if not already
// pending
- if (blockManager.isNeededReconstruction(block, liveReplicas)) {
+ boolean isDecommission = datanode.isDecommissionInProgress();
+ boolean neededReconstruction = isDecommission ?
+ blockManager.isNeededReconstruction(block, num) :
+ blockManager.isNeededReconstructionForMaintenance(block, num);
+ if (neededReconstruction) {
if (!blockManager.neededReconstruction.contains(block) &&
blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
blockManager.isPopulatingReplQueues()) {
// Process these blocks only when active NN is out of safe mode.
blockManager.neededReconstruction.add(block,
liveReplicas, num.readOnlyReplicas(),
- num.decommissionedAndDecommissioning(),
+ num.outOfServiceReplicas(),
blockManager.getExpectedRedundancyNum(block));
}
}
// Even if the block is without sufficient redundancy,
// it doesn't block decommission if has sufficient redundancy
- if (isSufficient(block, bc, num)) {
+ if (isSufficient(block, bc, num, isDecommission)) {
if (pruneReliableBlocks) {
it.remove();
}
@@ -662,14 +721,13 @@ public class DecommissionManager {
if (bc.isUnderConstruction()) {
lowRedundancyInOpenFiles++;
}
- if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
- decommissionOnlyReplicas++;
+ if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
+ outOfServiceOnlyReplicas++;
}
}
- datanode.decommissioningStatus.set(lowRedundancyBlocks,
- decommissionOnlyReplicas,
- lowRedundancyInOpenFiles);
+ datanode.getLeavingServiceStatus().set(lowRedundancyBlocks,
+ outOfServiceOnlyReplicas, lowRedundancyInOpenFiles);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 082e949..0ae6f0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -130,12 +130,14 @@ class ErasureCodingWork extends BlockReconstructionWork {
// we only need to replicate one internal block to a new rack
int sourceIndex = chooseSource4SimpleReplication();
createReplicationWork(sourceIndex, targets[0]);
- } else if (numberReplicas.decommissioning() > 0 && hasAllInternalBlocks()) {
- List<Integer> decommissioningSources = findDecommissioningSources();
+ } else if ((numberReplicas.decommissioning() > 0 ||
+ numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
+ hasAllInternalBlocks()) {
+ List<Integer> leavingServiceSources = findLeavingServiceSources();
// decommissioningSources.size() should be >= targets.length
- final int num = Math.min(decommissioningSources.size(), targets.length);
+ final int num = Math.min(leavingServiceSources.size(), targets.length);
for (int i = 0; i < num; i++) {
- createReplicationWork(decommissioningSources.get(i), targets[i]);
+ createReplicationWork(leavingServiceSources.get(i), targets[i]);
}
} else {
targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
@@ -162,10 +164,12 @@ class ErasureCodingWork extends BlockReconstructionWork {
}
}
- private List<Integer> findDecommissioningSources() {
+ private List<Integer> findLeavingServiceSources() {
List<Integer> srcIndices = new ArrayList<>();
for (int i = 0; i < getSrcNodes().length; i++) {
- if (getSrcNodes()[i].isDecommissionInProgress()) {
+ if (getSrcNodes()[i].isDecommissionInProgress() ||
+ (getSrcNodes()[i].isEnteringMaintenance() &&
+ getSrcNodes()[i].isAlive())) {
srcIndices.add(i);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index d728ee2..a72ad64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -25,10 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.util.Daemon;
@@ -269,13 +266,19 @@ class HeartbeatManager implements DatanodeStatistics {
if (!node.isAlive()) {
LOG.info("Dead node {} is put in maintenance state immediately.", node);
node.setInMaintenance();
- } else if (node.isDecommissioned()) {
- LOG.info("Decommissioned node " + node + " is put in maintenance state"
- + " immediately.");
- node.setInMaintenance();
} else {
stats.subtract(node);
- node.startMaintenance();
+ if (node.isDecommissioned()) {
+ LOG.info("Decommissioned node " + node + " is put in maintenance state"
+ + " immediately.");
+ node.setInMaintenance();
+ } else if (blockManager.getMinReplicationToBeInMaintenance() == 0) {
+ LOG.info("MinReplicationToBeInMaintenance is set to zero. " + node +
+ " is put in maintenance state" + " immediately.");
+ node.setInMaintenance();
+ } else {
+ node.startMaintenance();
+ }
stats.add(node);
}
}
@@ -352,7 +355,7 @@ class HeartbeatManager implements DatanodeStatistics {
boolean allAlive = false;
while (!allAlive) {
// locate the first dead node.
- DatanodeID dead = null;
+ DatanodeDescriptor dead = null;
// locate the first failed storage that isn't on a dead node.
DatanodeStorageInfo failedStorage = null;
@@ -401,7 +404,7 @@ class HeartbeatManager implements DatanodeStatistics {
// acquire the fsnamesystem lock, and then remove the dead node.
namesystem.writeLock();
try {
- dm.removeDeadDatanode(dead);
+ dm.removeDeadDatanode(dead, !dead.isMaintenance());
} finally {
namesystem.writeUnlock();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
index de8cf4e..3a26f4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
@@ -155,7 +155,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
private int getPriority(BlockInfo block,
int curReplicas,
int readOnlyReplicas,
- int decommissionedReplicas,
+ int outOfServiceReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
if (curReplicas >= expectedReplicas) {
@@ -164,20 +164,20 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
}
if (block.isStriped()) {
BlockInfoStriped sblk = (BlockInfoStriped) block;
- return getPriorityStriped(curReplicas, decommissionedReplicas,
+ return getPriorityStriped(curReplicas, outOfServiceReplicas,
sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
} else {
return getPriorityContiguous(curReplicas, readOnlyReplicas,
- decommissionedReplicas, expectedReplicas);
+ outOfServiceReplicas, expectedReplicas);
}
}
private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
- int decommissionedReplicas, int expectedReplicas) {
+ int outOfServiceReplicas, int expectedReplicas) {
if (curReplicas == 0) {
// If there are zero non-decommissioned replicas but there are
- // some decommissioned replicas, then assign them highest priority
- if (decommissionedReplicas > 0) {
+ // some out of service replicas, then assign them highest priority
+ if (outOfServiceReplicas > 0) {
return QUEUE_HIGHEST_PRIORITY;
}
if (readOnlyReplicas > 0) {
@@ -201,11 +201,11 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
}
}
- private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+ private int getPriorityStriped(int curReplicas, int outOfServiceReplicas,
short dataBlkNum, short parityBlkNum) {
if (curReplicas < dataBlkNum) {
// There are some replicas on decommissioned nodes so it's not corrupted
- if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+ if (curReplicas + outOfServiceReplicas >= dataBlkNum) {
return QUEUE_HIGHEST_PRIORITY;
}
return QUEUE_WITH_CORRUPT_BLOCKS;
@@ -227,18 +227,15 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
*
* @param block a low redundancy block
* @param curReplicas current number of replicas of the block
- * @param decomissionedReplicas the number of decommissioned replicas
+ * @param outOfServiceReplicas the number of out-of-service replicas
* @param expectedReplicas expected number of replicas of the block
* @return true if the block was added to a queue.
*/
synchronized boolean add(BlockInfo block,
- int curReplicas,
- int readOnlyReplicas,
- int decomissionedReplicas,
- int expectedReplicas) {
- assert curReplicas >= 0 : "Negative replicas!";
+ int curReplicas, int readOnlyReplicas,
+ int outOfServiceReplicas, int expectedReplicas) {
final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
- decomissionedReplicas, expectedReplicas);
+ outOfServiceReplicas, expectedReplicas);
if(priorityQueues.get(priLevel).add(block)) {
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
expectedReplicas == 1) {
@@ -257,12 +254,10 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
/** Remove a block from a low redundancy queue. */
synchronized boolean remove(BlockInfo block,
- int oldReplicas,
- int oldReadOnlyReplicas,
- int decommissionedReplicas,
- int oldExpectedReplicas) {
+ int oldReplicas, int oldReadOnlyReplicas,
+ int outOfServiceReplicas, int oldExpectedReplicas) {
final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
- decommissionedReplicas, oldExpectedReplicas);
+ outOfServiceReplicas, oldExpectedReplicas);
boolean removedBlock = remove(block, priLevel);
if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
oldExpectedReplicas == 1 &&
@@ -325,22 +320,22 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
* method call.
* @param block a low redundancy block
* @param curReplicas current number of replicas of the block
- * @param decommissionedReplicas the number of decommissioned replicas
+ * @param outOfServiceReplicas the number of out-of-service replicas
* @param curExpectedReplicas expected number of replicas of the block
* @param curReplicasDelta the change in the replicate count from before
* @param expectedReplicasDelta the change in the expected replica count
* from before
*/
synchronized void update(BlockInfo block, int curReplicas,
- int readOnlyReplicas, int decommissionedReplicas,
- int curExpectedReplicas,
- int curReplicasDelta, int expectedReplicasDelta) {
+ int readOnlyReplicas, int outOfServiceReplicas,
+ int curExpectedReplicas,
+ int curReplicasDelta, int expectedReplicasDelta) {
int oldReplicas = curReplicas-curReplicasDelta;
int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
int curPri = getPriority(block, curReplicas, readOnlyReplicas,
- decommissionedReplicas, curExpectedReplicas);
+ outOfServiceReplicas, curExpectedReplicas);
int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
- decommissionedReplicas, oldExpectedReplicas);
+ outOfServiceReplicas, oldExpectedReplicas);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " +
block +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
index 0198bcc..be984f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
@@ -24,9 +24,11 @@ import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.Store
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.EXCESS;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE;
+import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_FOR_READ;
+import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
+import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.REDUNDANT;
import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.STALESTORAGE;
-import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
/**
* A immutable object that stores the number of live replicas and
@@ -41,6 +43,14 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta
READONLY,
DECOMMISSIONING,
DECOMMISSIONED,
+ // We need live ENTERING_MAINTENANCE nodes to continue
+ // to serve read request while it is being transitioned to live
+ // IN_MAINTENANCE if these are the only replicas left.
+ // MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
+ // Live ENTERING_MAINTENANCE.
+ MAINTENANCE_NOT_FOR_READ,
+ // Live ENTERING_MAINTENANCE nodes to serve read requests.
+ MAINTENANCE_FOR_READ,
CORRUPT,
// excess replicas already tracked by blockmanager's excess map
EXCESS,
@@ -106,4 +116,20 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta
public int redundantInternalBlocks() {
return (int) get(REDUNDANT);
}
-}
+
+ public int maintenanceNotForReadReplicas() {
+ return (int) get(MAINTENANCE_NOT_FOR_READ);
+ }
+
+ public int maintenanceReplicas() {
+ return (int) (get(MAINTENANCE_NOT_FOR_READ) + get(MAINTENANCE_FOR_READ));
+ }
+
+ public int outOfServiceReplicas() {
+ return maintenanceReplicas() + decommissionedAndDecommissioning();
+ }
+
+ public int liveEnteringMaintenanceReplicas() {
+ return (int)get(MAINTENANCE_FOR_READ);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
index 45dcc8d..005e6d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
@@ -81,7 +81,7 @@ public class StorageTypeStats {
final DatanodeDescriptor node) {
capacityUsed += info.getDfsUsed();
blockPoolUsed += info.getBlockPoolUsed();
- if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ if (node.isInService()) {
capacityTotal += info.getCapacity();
capacityRemaining += info.getRemaining();
} else {
@@ -90,7 +90,7 @@ public class StorageTypeStats {
}
void addNode(final DatanodeDescriptor node) {
- if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ if (node.isInService()) {
nodesInService++;
}
}
@@ -99,7 +99,7 @@ public class StorageTypeStats {
final DatanodeDescriptor node) {
capacityUsed -= info.getDfsUsed();
blockPoolUsed -= info.getBlockPoolUsed();
- if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ if (node.isInService()) {
capacityTotal -= info.getCapacity();
capacityRemaining -= info.getRemaining();
} else {
@@ -108,7 +108,7 @@ public class StorageTypeStats {
}
void subtractNode(final DatanodeDescriptor node) {
- if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ if (node.isInService()) {
nodesInService--;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 563682f..eb870f8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -5462,11 +5462,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
.<String, Object> builder()
.put("xferaddr", node.getXferAddr())
.put("underReplicatedBlocks",
- node.decommissioningStatus.getUnderReplicatedBlocks())
+ node.getLeavingServiceStatus().getUnderReplicatedBlocks())
+ // TODO use another property name for outOfServiceOnlyReplicas.
.put("decommissionOnlyReplicas",
- node.decommissioningStatus.getDecommissionOnlyReplicas())
+ node.getLeavingServiceStatus().getOutOfServiceOnlyReplicas())
.put("underReplicateInOpenFiles",
- node.decommissioningStatus.getUnderReplicatedInOpenFiles())
+ node.getLeavingServiceStatus().getUnderReplicatedInOpenFiles())
.build();
info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
}
@@ -5528,7 +5529,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ if (!node.isInService()) {
it.remove();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 84b51f6..483663e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -548,6 +548,13 @@
</property>
<property>
+ <name>dfs.namenode.maintenance.replication.min</name>
+ <value>1</value>
+ <description>Minimal live block replication in existence of maintenance mode.
+ </description>
+</property>
+
+<property>
<name>dfs.namenode.safemode.replication.min</name>
<value></value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
index 0698628..534c5e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
@@ -102,6 +102,7 @@ public class AdminStatesBaseTest {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
hostsFileWriter.initialize(conf, "temp/admin");
+
}
@After
@@ -110,17 +111,22 @@ public class AdminStatesBaseTest {
shutdownCluster();
}
- protected void writeFile(FileSystem fileSys, Path name, int repl)
+ static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys,
+ Path name, short repl, short numOfBlocks) throws IOException {
+ return writeFile(fileSys, name, repl, numOfBlocks, false);
+ }
+
+ static protected void writeFile(FileSystem fileSys, Path name, int repl)
throws IOException {
writeFile(fileSys, name, repl, 2);
}
- protected void writeFile(FileSystem fileSys, Path name, int repl,
+ static protected void writeFile(FileSystem fileSys, Path name, int repl,
int numOfBlocks) throws IOException {
writeFile(fileSys, name, repl, numOfBlocks, true);
}
- protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+ static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
int repl, int numOfBlocks, boolean completeFile)
throws IOException {
// create and write a file that contains two blocks of data
@@ -136,6 +142,7 @@ public class AdminStatesBaseTest {
stm.close();
return null;
} else {
+ stm.flush();
// Do not close stream, return it
// so that it is not garbage collected
return stm;
@@ -353,7 +360,7 @@ public class AdminStatesBaseTest {
protected void shutdownCluster() {
if (cluster != null) {
- cluster.shutdown();
+ cluster.shutdown(true);
}
}
@@ -362,12 +369,13 @@ public class AdminStatesBaseTest {
refreshNodes(conf);
}
- protected DatanodeDescriptor getDatanodeDesriptor(
+ static private DatanodeDescriptor getDatanodeDesriptor(
final FSNamesystem ns, final String datanodeUuid) {
return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
}
- protected void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+ static public void cleanupFile(FileSystem fileSys, Path name)
+ throws IOException {
assertTrue(fileSys.exists(name));
fileSys.delete(name, true);
assertTrue(!fileSys.exists(name));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b61fb267/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index ddb8237..6ca1e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -484,7 +484,7 @@ public class TestDecommission extends AdminStatesBaseTest {
shutdownCluster();
}
}
-
+
/**
* Tests cluster storage statistics during decommissioning for non
* federated cluster
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org