You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ma...@apache.org on 2017/08/02 21:28:24 UTC
[2/2] hadoop git commit: HDFS-9388. Decommission related code to
support Maintenance State for datanodes.
HDFS-9388. Decommission related code to support Maintenance State for datanodes.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79df1e75
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79df1e75
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79df1e75
Branch: refs/heads/trunk
Commit: 79df1e750ef558afed6d166ce225a23061b36aed
Parents: 12e44e7
Author: Manoj Govindassamy <ma...@apache.org>
Authored: Wed Aug 2 14:22:41 2017 -0700
Committer: Manoj Govindassamy <ma...@apache.org>
Committed: Wed Aug 2 14:22:46 2017 -0700
----------------------------------------------------------------------
.../blockmanagement/DatanodeAdminManager.java | 756 +++++++++++++++++++
.../server/blockmanagement/DatanodeManager.java | 30 +-
.../blockmanagement/DecommissionManager.java | 741 ------------------
.../hadoop/hdfs/server/namenode/BackupNode.java | 2 +-
.../src/main/resources/hdfs-default.xml | 21 +-
.../apache/hadoop/hdfs/TestDecommission.java | 44 +-
.../blockmanagement/BlockManagerTestUtil.java | 2 +-
...constructStripedBlocksWithRackAwareness.java | 5 +-
.../TestReplicationPolicyConsiderLoad.java | 2 +-
.../namenode/TestDecommissioningStatus.java | 6 +-
.../TestDefaultBlockPlacementPolicy.java | 4 +-
.../hadoop/hdfs/server/namenode/TestFsck.java | 13 +-
.../server/namenode/TestNameNodeMXBean.java | 6 +-
.../namenode/TestNamenodeCapacityReport.java | 8 +-
14 files changed, 833 insertions(+), 807 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
new file mode 100644
index 0000000..928036a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeAdminManager.java
@@ -0,0 +1,756 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+import java.util.AbstractList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.util.ChunkedArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Manages decommissioning and maintenance state for DataNodes. A background
+ * monitor thread periodically checks the status of DataNodes that are
+ * decommissioning or entering maintenance state.
+ * <p/>
+ * A DataNode can be decommissioned in a few situations:
+ * <ul>
+ * <li>If a DN is dead, it is decommissioned immediately.</li>
+ * <li>If a DN is alive, it is decommissioned after all of its blocks
+ * are sufficiently replicated. Merely under-replicated blocks do not
+ * block decommissioning as long as they are above a replication
+ * threshold.</li>
+ * </ul>
+ * In the second case, the DataNode transitions to a DECOMMISSION_INPROGRESS
+ * state and is tracked by the monitor thread. The monitor periodically scans
+ * through the list of insufficiently replicated blocks on these DataNodes to
+ * determine if they can be DECOMMISSIONED. The monitor also prunes this list
+ * as blocks become replicated, so monitor scans will become more efficient
+ * over time.
+ * <p/>
+ * DECOMMISSION_INPROGRESS nodes that become dead do not progress to
+ * DECOMMISSIONED until they become live again. This prevents potential
+ * durability loss for singly-replicated blocks (see HDFS-6791).
+ * <p/>
+ * DataNodes can also be put under maintenance state for any short duration
+ * maintenance operations. Unlike decommissioning, blocks are not always
+ * re-replicated for the DataNodes to enter maintenance state. When the
+ * blocks are replicated at least dfs.namenode.maintenance.replication.min,
+ * DataNodes transition to IN_MAINTENANCE state. Otherwise, just like
+ * decommissioning, DataNodes transition to ENTERING_MAINTENANCE state and
+ * wait for the blocks to be sufficiently replicated and then transition to
+ * IN_MAINTENANCE state. The block replication factor is relaxed for a maximum
+ * of maintenance expiry time. When DataNodes don't transition or join the
+ * cluster back by expiry time, blocks are re-replicated just as in
+ * decommissioning case as to avoid read or write performance degradation.
+ * <p/>
+ * This class depends on the FSNamesystem lock for synchronization.
+ */
+@InterfaceAudience.Private
+public class DatanodeAdminManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DatanodeAdminManager.class);
+ private final Namesystem namesystem;
+ private final BlockManager blockManager;
+ private final HeartbeatManager hbManager;
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
+ * datanodes that are being tracked so they can be be marked as
+ * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
+ * IN_MAINTENANCE, the node remains in the map until
+ * maintenance expires checked during a monitor tick.
+ * <p/>
+ * This holds a set of references to the under-replicated blocks on the DN at
+ * the time the DN is added to the map, i.e. the blocks that are preventing
+ * the node from being marked as decommissioned. During a monitor tick, this
+ * list is pruned as blocks becomes replicated.
+ * <p/>
+ * Note also that the reference to the list of under-replicated blocks
+ * will be null on initial add
+ * <p/>
+ * However, this map can become out-of-date since it is not updated by block
+ * reports or other events. Before being finally marking as decommissioned,
+ * another check is done with the actual block map.
+ */
+ private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
+ outOfServiceNodeBlocks;
+
+ /**
+ * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To
+ * limit the impact on NN memory consumption, we limit the number of nodes in
+ * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
+ */
+ private final Queue<DatanodeDescriptor> pendingNodes;
+ private Monitor monitor = null;
+
+ DatanodeAdminManager(final Namesystem namesystem,
+ final BlockManager blockManager, final HeartbeatManager hbManager) {
+ this.namesystem = namesystem;
+ this.blockManager = blockManager;
+ this.hbManager = hbManager;
+
+ executor = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setNameFormat("DatanodeAdminMonitor-%d")
+ .setDaemon(true).build());
+ outOfServiceNodeBlocks = new TreeMap<>();
+ pendingNodes = new LinkedList<>();
+ }
+
+ /**
+ * Start the DataNode admin monitor thread.
+ * @param conf
+ */
+ void activate(Configuration conf) {
+ final int intervalSecs = (int) conf.getTimeDuration(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT,
+ TimeUnit.SECONDS);
+ checkArgument(intervalSecs >= 0, "Cannot set a negative " +
+ "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
+
+ int blocksPerInterval = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
+
+ final String deprecatedKey =
+ "dfs.namenode.decommission.nodes.per.interval";
+ final String strNodes = conf.get(deprecatedKey);
+ if (strNodes != null) {
+ LOG.warn("Deprecated configuration key {} will be ignored.",
+ deprecatedKey);
+ LOG.warn("Please update your configuration to use {} instead.",
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+ }
+
+ checkArgument(blocksPerInterval > 0,
+ "Must set a positive value for "
+ + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+
+ final int maxConcurrentTrackedNodes = conf.getInt(
+ DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+ DFSConfigKeys
+ .DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
+ checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
+ "value for "
+ + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
+
+ monitor = new Monitor(blocksPerInterval, maxConcurrentTrackedNodes);
+ executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
+ TimeUnit.SECONDS);
+
+ LOG.debug("Activating DatanodeAdminManager with interval {} seconds, " +
+ "{} max blocks per interval, " +
+ "{} max concurrently tracked nodes.", intervalSecs,
+ blocksPerInterval, maxConcurrentTrackedNodes);
+ }
+
+ /**
+ * Stop the admin monitor thread, waiting briefly for it to terminate.
+ */
+ void close() {
+ executor.shutdownNow();
+ try {
+ executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {}
+ }
+
+ /**
+ * Start decommissioning the specified datanode.
+ * @param node
+ */
+ @VisibleForTesting
+ public void startDecommission(DatanodeDescriptor node) {
+ if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
+ // Update DN stats maintained by HeartbeatManager
+ hbManager.startDecommission(node);
+ // hbManager.startDecommission will set dead node to decommissioned.
+ if (node.isDecommissionInProgress()) {
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ LOG.info("Starting decommission of {} {} with {} blocks",
+ node, storage, storage.numBlocks());
+ }
+ node.getLeavingServiceStatus().setStartTime(monotonicNow());
+ pendingNodes.add(node);
+ }
+ } else {
+ LOG.trace("startDecommission: Node {} in {}, nothing to do." +
+ node, node.getAdminState());
+ }
+ }
+
+ /**
+ * Stop decommissioning the specified datanode.
+ * @param node
+ */
+ @VisibleForTesting
+ public void stopDecommission(DatanodeDescriptor node) {
+ if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+ // Update DN stats maintained by HeartbeatManager
+ hbManager.stopDecommission(node);
+ // extra redundancy blocks will be detected and processed when
+ // the dead node comes back and send in its full block report.
+ if (node.isAlive()) {
+ blockManager.processExtraRedundancyBlocksOnInService(node);
+ }
+ // Remove from tracking in DatanodeAdminManager
+ pendingNodes.remove(node);
+ outOfServiceNodeBlocks.remove(node);
+ } else {
+ LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
+ node, node.getAdminState());
+ }
+ }
+
+ /**
+ * Start maintenance of the specified datanode.
+ * @param node
+ */
+ @VisibleForTesting
+ public void startMaintenance(DatanodeDescriptor node,
+ long maintenanceExpireTimeInMS) {
+ // Even if the node is already in maintenance, we still need to adjust
+ // the expiration time.
+ node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
+ if (!node.isMaintenance()) {
+ // Update DN stats maintained by HeartbeatManager
+ hbManager.startMaintenance(node);
+ // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
+ if (node.isEnteringMaintenance()) {
+ for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+ LOG.info("Starting maintenance of {} {} with {} blocks",
+ node, storage, storage.numBlocks());
+ }
+ node.getLeavingServiceStatus().setStartTime(monotonicNow());
+ }
+ // Track the node regardless whether it is ENTERING_MAINTENANCE or
+ // IN_MAINTENANCE to support maintenance expiration.
+ pendingNodes.add(node);
+ } else {
+ LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
+ node, node.getAdminState());
+ }
+ }
+
+
+ /**
+ * Stop maintenance of the specified datanode.
+ * @param node
+ */
+ @VisibleForTesting
+ public void stopMaintenance(DatanodeDescriptor node) {
+ if (node.isMaintenance()) {
+ // Update DN stats maintained by HeartbeatManager
+ hbManager.stopMaintenance(node);
+
+ // extra redundancy blocks will be detected and processed when
+ // the dead node comes back and send in its full block report.
+ if (!node.isAlive()) {
+ // The node became dead when it was in maintenance, at which point
+ // the replicas weren't removed from block maps.
+ // When the node leaves maintenance, the replicas should be removed
+ // from the block maps to trigger the necessary replication to
+ // maintain the safety property of "# of live replicas + maintenance
+ // replicas" >= the expected redundancy.
+ blockManager.removeBlocksAssociatedTo(node);
+ } else {
+ // Even though putting nodes in maintenance node doesn't cause live
+ // replicas to match expected replication factor, it is still possible
+ // to have over replicated when the node leaves maintenance node.
+ // First scenario:
+ // a. Node became dead when it is at AdminStates.NORMAL, thus
+ // block is replicated so that 3 replicas exist on other nodes.
+ // b. Admins put the dead node into maintenance mode and then
+ // have the node rejoin the cluster.
+ // c. Take the node out of maintenance mode.
+ // Second scenario:
+ // a. With replication factor 3, set one replica to maintenance node,
+ // thus block has 1 maintenance replica and 2 live replicas.
+ // b. Change the replication factor to 2. The block will still have
+ // 1 maintenance replica and 2 live replicas.
+ // c. Take the node out of maintenance mode.
+ blockManager.processExtraRedundancyBlocksOnInService(node);
+ }
+
+ // Remove from tracking in DatanodeAdminManager
+ pendingNodes.remove(node);
+ outOfServiceNodeBlocks.remove(node);
+ } else {
+ LOG.trace("stopMaintenance: Node {} in {}, nothing to do." +
+ node, node.getAdminState());
+ }
+ }
+
+ private void setDecommissioned(DatanodeDescriptor dn) {
+ dn.setDecommissioned();
+ LOG.info("Decommissioning complete for node {}", dn);
+ }
+
+ private void setInMaintenance(DatanodeDescriptor dn) {
+ dn.setInMaintenance();
+ LOG.info("Node {} has entered maintenance mode.", dn);
+ }
+
+ /**
+ * Checks whether a block is sufficiently replicated/stored for
+ * DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE datanodes. For replicated
+ * blocks or striped blocks, full-strength replication or storage is not
+ * always necessary, hence "sufficient".
+ * @return true if sufficient, else false.
+ */
+ private boolean isSufficient(BlockInfo block, BlockCollection bc,
+ NumberReplicas numberReplicas, boolean isDecommission) {
+ if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
+ // Block has enough replica, skip
+ LOG.trace("Block {} does not need replication.", block);
+ return true;
+ }
+
+ final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
+ numberReplicas);
+ final int numLive = numberReplicas.liveReplicas();
+
+ // Block is under-replicated
+ LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
+ numLive);
+ if (isDecommission && numExpected > numLive) {
+ if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
+ // Can decom a UC block as long as there will still be minReplicas
+ if (blockManager.hasMinStorage(block, numLive)) {
+ LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+ + ">= minR ({})", block, numLive,
+ blockManager.getMinStorageNum(block));
+ return true;
+ } else {
+ LOG.trace("UC block {} insufficiently-replicated since numLive "
+ + "({}) < minR ({})", block, numLive,
+ blockManager.getMinStorageNum(block));
+ }
+ } else {
+ // Can decom a non-UC as long as the default replication is met
+ if (numLive >= blockManager.getDefaultStorageNum(block)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private void logBlockReplicationInfo(BlockInfo block,
+ BlockCollection bc,
+ DatanodeDescriptor srcNode, NumberReplicas num,
+ Iterable<DatanodeStorageInfo> storages) {
+ if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
+ return;
+ }
+
+ int curReplicas = num.liveReplicas();
+ int curExpectedRedundancy = blockManager.getExpectedRedundancyNum(block);
+ StringBuilder nodeList = new StringBuilder();
+ for (DatanodeStorageInfo storage : storages) {
+ final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+ nodeList.append(node);
+ nodeList.append(" ");
+ }
+ NameNode.blockStateChangeLog.info(
+ "Block: " + block + ", Expected Replicas: "
+ + curExpectedRedundancy + ", live replicas: " + curReplicas
+ + ", corrupt replicas: " + num.corruptReplicas()
+ + ", decommissioned replicas: " + num.decommissioned()
+ + ", decommissioning replicas: " + num.decommissioning()
+ + ", maintenance replicas: " + num.maintenanceReplicas()
+ + ", live entering maintenance replicas: "
+ + num.liveEnteringMaintenanceReplicas()
+ + ", excess replicas: " + num.excessReplicas()
+ + ", Is Open File: " + bc.isUnderConstruction()
+ + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+ + srcNode + ", Is current datanode decommissioning: "
+ + srcNode.isDecommissionInProgress() +
+ ", Is current datanode entering maintenance: "
+ + srcNode.isEnteringMaintenance());
+ }
+
+ @VisibleForTesting
+ public int getNumPendingNodes() {
+ return pendingNodes.size();
+ }
+
+ @VisibleForTesting
+ public int getNumTrackedNodes() {
+ return outOfServiceNodeBlocks.size();
+ }
+
+ @VisibleForTesting
+ public int getNumNodesChecked() {
+ return monitor.numNodesChecked;
+ }
+
+ /**
+ * Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
+ * ENTERING_MAINTENANCE state.
+ * <p/>
+ * Since this is done while holding the namesystem lock,
+ * the amount of work per monitor tick is limited.
+ */
+ private class Monitor implements Runnable {
+ /**
+ * The maximum number of blocks to check per tick.
+ */
+ private final int numBlocksPerCheck;
+ /**
+ * The maximum number of nodes to track in outOfServiceNodeBlocks.
+ * A value of 0 means no limit.
+ */
+ private final int maxConcurrentTrackedNodes;
+ /**
+ * The number of blocks that have been checked on this tick.
+ */
+ private int numBlocksChecked = 0;
+ /**
+ * The number of blocks checked after (re)holding lock.
+ */
+ private int numBlocksCheckedPerLock = 0;
+ /**
+ * The number of nodes that have been checked on this tick. Used for
+ * statistics.
+ */
+ private int numNodesChecked = 0;
+ /**
+ * The last datanode in outOfServiceNodeBlocks that we've processed.
+ */
+ private DatanodeDescriptor iterkey = new DatanodeDescriptor(
+ new DatanodeID("", "", "", 0, 0, 0, 0));
+
+ Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
+ this.numBlocksPerCheck = numBlocksPerCheck;
+ this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
+ }
+
+ private boolean exceededNumBlocksPerCheck() {
+ LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
+ return numBlocksChecked >= numBlocksPerCheck;
+ }
+
+ @Override
+ public void run() {
+ if (!namesystem.isRunning()) {
+ LOG.info("Namesystem is not running, skipping " +
+ "decommissioning/maintenance checks.");
+ return;
+ }
+ // Reset the checked count at beginning of each iteration
+ numBlocksChecked = 0;
+ numBlocksCheckedPerLock = 0;
+ numNodesChecked = 0;
+ // Check decommission or maintenance progress.
+ namesystem.writeLock();
+ try {
+ processPendingNodes();
+ check();
+ } finally {
+ namesystem.writeUnlock();
+ }
+ if (numBlocksChecked + numNodesChecked > 0) {
+ LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
+ numNodesChecked);
+ }
+ }
+
+ /**
+ * Pop datanodes off the pending list and into decomNodeBlocks,
+ * subject to the maxConcurrentTrackedNodes limit.
+ */
+ private void processPendingNodes() {
+ while (!pendingNodes.isEmpty() &&
+ (maxConcurrentTrackedNodes == 0 ||
+ outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
+ outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
+ }
+ }
+
+ private void check() {
+ final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
+ it = new CyclicIteration<>(outOfServiceNodeBlocks,
+ iterkey).iterator();
+ final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
+
+ while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
+ .isRunning()) {
+ numNodesChecked++;
+ final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
+ entry = it.next();
+ final DatanodeDescriptor dn = entry.getKey();
+ AbstractList<BlockInfo> blocks = entry.getValue();
+ boolean fullScan = false;
+ if (dn.isMaintenance() && dn.maintenanceExpired()) {
+ // If maintenance expires, stop tracking it.
+ stopMaintenance(dn);
+ toRemove.add(dn);
+ continue;
+ }
+ if (dn.isInMaintenance()) {
+ // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
+ continue;
+ }
+ if (blocks == null) {
+ // This is a newly added datanode, run through its list to schedule
+ // under-replicated blocks for replication and collect the blocks
+ // that are insufficiently replicated for further tracking
+ LOG.debug("Newly-added node {}, doing full scan to find " +
+ "insufficiently-replicated blocks.", dn);
+ blocks = handleInsufficientlyStored(dn);
+ outOfServiceNodeBlocks.put(dn, blocks);
+ fullScan = true;
+ } else {
+ // This is a known datanode, check if its # of insufficiently
+ // replicated blocks has dropped to zero and if it can move
+ // to the next state.
+ LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
+ pruneReliableBlocks(dn, blocks);
+ }
+ if (blocks.size() == 0) {
+ if (!fullScan) {
+ // If we didn't just do a full scan, need to re-check with the
+ // full block map.
+ //
+ // We've replicated all the known insufficiently replicated
+ // blocks. Re-check with the full block map before finally
+ // marking the datanode as DECOMMISSIONED or IN_MAINTENANCE.
+ LOG.debug("Node {} has finished replicating current set of "
+ + "blocks, checking with the full block map.", dn);
+ blocks = handleInsufficientlyStored(dn);
+ outOfServiceNodeBlocks.put(dn, blocks);
+ }
+ // If the full scan is clean AND the node liveness is okay,
+ // we can finally mark as DECOMMISSIONED or IN_MAINTENANCE.
+ final boolean isHealthy =
+ blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
+ if (blocks.size() == 0 && isHealthy) {
+ if (dn.isDecommissionInProgress()) {
+ setDecommissioned(dn);
+ toRemove.add(dn);
+ } else if (dn.isEnteringMaintenance()) {
+ // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
+ // to track maintenance expiration.
+ setInMaintenance(dn);
+ } else {
+ Preconditions.checkState(false,
+ "A node is in an invalid state!");
+ }
+ LOG.debug("Node {} is sufficiently replicated and healthy, "
+ + "marked as {}.", dn.getAdminState());
+ } else {
+ LOG.debug("Node {} {} healthy."
+ + " It needs to replicate {} more blocks."
+ + " {} is still in progress.", dn,
+ isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
+ }
+ } else {
+ LOG.debug("Node {} still has {} blocks to replicate "
+ + "before it is a candidate to finish {}.",
+ dn, blocks.size(), dn.getAdminState());
+ }
+ iterkey = dn;
+ }
+ // Remove the datanodes that are DECOMMISSIONED or in service after
+ // maintenance expiration.
+ for (DatanodeDescriptor dn : toRemove) {
+ Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
+ "Removing a node that is not yet decommissioned or in service!");
+ outOfServiceNodeBlocks.remove(dn);
+ }
+ }
+
+ /**
+ * Removes reliable blocks from the block list of a datanode.
+ */
+ private void pruneReliableBlocks(final DatanodeDescriptor datanode,
+ AbstractList<BlockInfo> blocks) {
+ processBlocksInternal(datanode, blocks.iterator(), null, true);
+ }
+
+ /**
+ * Returns a list of blocks on a datanode that are insufficiently
+ * replicated or require recovery, i.e. requiring recovery and
+ * should prevent decommission or maintenance.
+ * <p/>
+ * As part of this, it also schedules replication/recovery work.
+ *
+ * @return List of blocks requiring recovery
+ */
+ private AbstractList<BlockInfo> handleInsufficientlyStored(
+ final DatanodeDescriptor datanode) {
+ AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
+ processBlocksInternal(datanode, datanode.getBlockIterator(),
+ insufficient, false);
+ return insufficient;
+ }
+
+ /**
+ * Used while checking if DECOMMISSION_INPROGRESS datanodes can be
+ * marked as DECOMMISSIONED or ENTERING_MAINTENANCE datanodes can be
+ * marked as IN_MAINTENANCE. Combines shared logic of pruneReliableBlocks
+ * and handleInsufficientlyStored.
+ *
+ * @param datanode Datanode
+ * @param it Iterator over the blocks on the
+ * datanode
+ * @param insufficientList Return parameter. If it's not null,
+ * will contain the insufficiently
+ * replicated-blocks from the list.
+ * @param pruneReliableBlocks whether to remove blocks reliable
+ * enough from the iterator
+ */
+ private void processBlocksInternal(
+ final DatanodeDescriptor datanode,
+ final Iterator<BlockInfo> it,
+ final List<BlockInfo> insufficientList,
+ boolean pruneReliableBlocks) {
+ boolean firstReplicationLog = true;
+ // Low redundancy in UC Blocks only
+ int lowRedundancyInOpenFiles = 0;
+ // All low redundancy blocks. Includes lowRedundancyInOpenFiles.
+ int lowRedundancyBlocks = 0;
+ // All maintenance and decommission replicas.
+ int outOfServiceOnlyReplicas = 0;
+ while (it.hasNext()) {
+ if (insufficientList == null
+ && numBlocksCheckedPerLock >= numBlocksPerCheck) {
+ // During fullscan insufficientlyReplicated will NOT be null, iterator
+ // will be DN's iterator. So should not yield lock, otherwise
+ // ConcurrentModificationException could occur.
+ // Once the fullscan done, iterator will be a copy. So can yield the
+ // lock.
+ // Yielding is required in case of block number is greater than the
+ // configured per-iteration-limit.
+ namesystem.writeUnlock();
+ try {
+ LOG.debug("Yielded lock during decommission/maintenance check");
+ Thread.sleep(0, 500);
+ } catch (InterruptedException ignored) {
+ return;
+ }
+ // reset
+ numBlocksCheckedPerLock = 0;
+ namesystem.writeLock();
+ }
+ numBlocksChecked++;
+ numBlocksCheckedPerLock++;
+ final BlockInfo block = it.next();
+ // Remove the block from the list if it's no longer in the block map,
+ // e.g. the containing file has been deleted
+ if (blockManager.blocksMap.getStoredBlock(block) == null) {
+ LOG.trace("Removing unknown block {}", block);
+ it.remove();
+ continue;
+ }
+
+ long bcId = block.getBlockCollectionId();
+ if (bcId == INodeId.INVALID_INODE_ID) {
+ // Orphan block, will be invalidated eventually. Skip.
+ continue;
+ }
+
+ final BlockCollection bc = blockManager.getBlockCollection(block);
+ final NumberReplicas num = blockManager.countNodes(block);
+ final int liveReplicas = num.liveReplicas();
+
+ // Schedule low redundancy blocks for reconstruction
+ // if not already pending.
+ boolean isDecommission = datanode.isDecommissionInProgress();
+ boolean neededReconstruction = isDecommission ?
+ blockManager.isNeededReconstruction(block, num) :
+ blockManager.isNeededReconstructionForMaintenance(block, num);
+ if (neededReconstruction) {
+ if (!blockManager.neededReconstruction.contains(block) &&
+ blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
+ blockManager.isPopulatingReplQueues()) {
+ // Process these blocks only when active NN is out of safe mode.
+ blockManager.neededReconstruction.add(block,
+ liveReplicas, num.readOnlyReplicas(),
+ num.outOfServiceReplicas(),
+ blockManager.getExpectedRedundancyNum(block));
+ }
+ }
+
+ // Even if the block is without sufficient redundancy,
+ // it might not block decommission/maintenance if it
+ // has sufficient redundancy.
+ if (isSufficient(block, bc, num, isDecommission)) {
+ if (pruneReliableBlocks) {
+ it.remove();
+ }
+ continue;
+ }
+
+ // We've found a block without sufficient redundancy.
+ if (insufficientList != null) {
+ insufficientList.add(block);
+ }
+ // Log if this is our first time through
+ if (firstReplicationLog) {
+ logBlockReplicationInfo(block, bc, datanode, num,
+ blockManager.blocksMap.getStorages(block));
+ firstReplicationLog = false;
+ }
+ // Update various counts
+ lowRedundancyBlocks++;
+ if (bc.isUnderConstruction()) {
+ lowRedundancyInOpenFiles++;
+ }
+ if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
+ outOfServiceOnlyReplicas++;
+ }
+ }
+
+ datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles,
+ lowRedundancyBlocks, outOfServiceOnlyReplicas);
+ }
+ }
+
+ @VisibleForTesting
+ void runMonitorForTest() throws ExecutionException, InterruptedException {
+ executor.submit(monitor).get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 2c5779a..d705fec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -75,7 +75,7 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
- private final DecommissionManager decomManager;
+ private final DatanodeAdminManager datanodeAdminManager;
private final HeartbeatManager heartbeatManager;
private final FSClusterStats fsClusterStats;
@@ -223,9 +223,10 @@ public class DatanodeManager {
networktopology = NetworkTopology.getInstance(conf);
}
- this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
- this.decomManager = new DecommissionManager(namesystem, blockManager,
- heartbeatManager);
+ this.heartbeatManager = new HeartbeatManager(namesystem,
+ blockManager, conf);
+ this.datanodeAdminManager = new DatanodeAdminManager(namesystem,
+ blockManager, heartbeatManager);
this.fsClusterStats = newFSClusterStats();
this.dataNodePeerStatsEnabled = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY,
@@ -372,12 +373,12 @@ public class DatanodeManager {
}
void activate(final Configuration conf) {
- decomManager.activate(conf);
+ datanodeAdminManager.activate(conf);
heartbeatManager.activate();
}
void close() {
- decomManager.close();
+ datanodeAdminManager.close();
heartbeatManager.close();
}
@@ -392,8 +393,8 @@ public class DatanodeManager {
}
@VisibleForTesting
- public DecommissionManager getDecomManager() {
- return decomManager;
+ public DatanodeAdminManager getDatanodeAdminManager() {
+ return datanodeAdminManager;
}
public HostConfigManager getHostConfigManager() {
@@ -991,9 +992,9 @@ public class DatanodeManager {
hostConfigManager.getMaintenanceExpirationTimeInMS(nodeReg);
// If the registered node is in exclude list, then decommission it
if (getHostConfigManager().isExcluded(nodeReg)) {
- decomManager.startDecommission(nodeReg);
+ datanodeAdminManager.startDecommission(nodeReg);
} else if (nodeReg.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
- decomManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
+ datanodeAdminManager.startMaintenance(nodeReg, maintenanceExpireTimeInMS);
}
}
@@ -1219,12 +1220,13 @@ public class DatanodeManager {
long maintenanceExpireTimeInMS =
hostConfigManager.getMaintenanceExpirationTimeInMS(node);
if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
- decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
+ datanodeAdminManager.startMaintenance(
+ node, maintenanceExpireTimeInMS);
} else if (hostConfigManager.isExcluded(node)) {
- decomManager.startDecommission(node);
+ datanodeAdminManager.startDecommission(node);
} else {
- decomManager.stopMaintenance(node);
- decomManager.stopDecommission(node);
+ datanodeAdminManager.stopMaintenance(node);
+ datanodeAdminManager.stopDecommission(node);
}
}
node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
deleted file mode 100644
index ae79826..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ /dev/null
@@ -1,741 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.hadoop.util.Time.monotonicNow;
-
-import java.util.AbstractList;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.TreeMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.server.namenode.INodeId;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.util.CyclicIteration;
-import org.apache.hadoop.util.ChunkedArrayList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Manages datanode decommissioning. A background monitor thread
- * periodically checks the status of datanodes that are in-progress of
- * decommissioning.
- * <p/>
- * A datanode can be decommissioned in a few situations:
- * <ul>
- * <li>If a DN is dead, it is decommissioned immediately.</li>
- * <li>If a DN is alive, it is decommissioned after all of its blocks
- * are sufficiently replicated. Merely under-replicated blocks do not
- * block decommissioning as long as they are above a replication
- * threshold.</li>
- * </ul>
- * In the second case, the datanode transitions to a
- * decommission-in-progress state and is tracked by the monitor thread. The
- * monitor periodically scans through the list of insufficiently replicated
- * blocks on these datanodes to
- * determine if they can be decommissioned. The monitor also prunes this list
- * as blocks become replicated, so monitor scans will become more efficient
- * over time.
- * <p/>
- * Decommission-in-progress nodes that become dead do not progress to
- * decommissioned until they become live again. This prevents potential
- * durability loss for singly-replicated blocks (see HDFS-6791).
- * <p/>
- * This class depends on the FSNamesystem lock for synchronization.
- */
-@InterfaceAudience.Private
-public class DecommissionManager {
- private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
- .class);
-
- private final Namesystem namesystem;
- private final BlockManager blockManager;
- private final HeartbeatManager hbManager;
- private final ScheduledExecutorService executor;
-
- /**
- * Map containing the DECOMMISSION_INPROGRESS or ENTERING_MAINTENANCE
- * datanodes that are being tracked so they can be be marked as
- * DECOMMISSIONED or IN_MAINTENANCE. Even after the node is marked as
- * IN_MAINTENANCE, the node remains in the map until
- * maintenance expires checked during a monitor tick.
- * <p/>
- * This holds a set of references to the under-replicated blocks on the DN at
- * the time the DN is added to the map, i.e. the blocks that are preventing
- * the node from being marked as decommissioned. During a monitor tick, this
- * list is pruned as blocks becomes replicated.
- * <p/>
- * Note also that the reference to the list of under-replicated blocks
- * will be null on initial add
- * <p/>
- * However, this map can become out-of-date since it is not updated by block
- * reports or other events. Before being finally marking as decommissioned,
- * another check is done with the actual block map.
- */
- private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfo>>
- outOfServiceNodeBlocks;
-
- /**
- * Tracking a node in outOfServiceNodeBlocks consumes additional memory. To
- * limit the impact on NN memory consumption, we limit the number of nodes in
- * outOfServiceNodeBlocks. Additional nodes wait in pendingNodes.
- */
- private final Queue<DatanodeDescriptor> pendingNodes;
-
- private Monitor monitor = null;
-
- DecommissionManager(final Namesystem namesystem,
- final BlockManager blockManager, final HeartbeatManager hbManager) {
- this.namesystem = namesystem;
- this.blockManager = blockManager;
- this.hbManager = hbManager;
-
- executor = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
- .setDaemon(true).build());
- outOfServiceNodeBlocks = new TreeMap<>();
- pendingNodes = new LinkedList<>();
- }
-
- /**
- * Start the decommission monitor thread.
- * @param conf
- */
- void activate(Configuration conf) {
- final int intervalSecs = (int) conf.getTimeDuration(
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT,
- TimeUnit.SECONDS);
- checkArgument(intervalSecs >= 0, "Cannot set a negative " +
- "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
-
- int blocksPerInterval = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
-
- final String deprecatedKey =
- "dfs.namenode.decommission.nodes.per.interval";
- final String strNodes = conf.get(deprecatedKey);
- if (strNodes != null) {
- LOG.warn("Deprecated configuration key {} will be ignored.",
- deprecatedKey);
- LOG.warn("Please update your configuration to use {} instead.",
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
- }
-
- checkArgument(blocksPerInterval > 0,
- "Must set a positive value for "
- + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
-
- final int maxConcurrentTrackedNodes = conf.getInt(
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
- DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
- checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
- "value for "
- + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
-
- monitor = new Monitor(blocksPerInterval, maxConcurrentTrackedNodes);
- executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
- TimeUnit.SECONDS);
-
- LOG.debug("Activating DecommissionManager with interval {} seconds, " +
- "{} max blocks per interval, " +
- "{} max concurrently tracked nodes.", intervalSecs,
- blocksPerInterval, maxConcurrentTrackedNodes);
- }
-
- /**
- * Stop the decommission monitor thread, waiting briefly for it to terminate.
- */
- void close() {
- executor.shutdownNow();
- try {
- executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {}
- }
-
- /**
- * Start decommissioning the specified datanode.
- * @param node
- */
- @VisibleForTesting
- public void startDecommission(DatanodeDescriptor node) {
- if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
- // Update DN stats maintained by HeartbeatManager
- hbManager.startDecommission(node);
- // hbManager.startDecommission will set dead node to decommissioned.
- if (node.isDecommissionInProgress()) {
- for (DatanodeStorageInfo storage : node.getStorageInfos()) {
- LOG.info("Starting decommission of {} {} with {} blocks",
- node, storage, storage.numBlocks());
- }
- node.getLeavingServiceStatus().setStartTime(monotonicNow());
- pendingNodes.add(node);
- }
- } else {
- LOG.trace("startDecommission: Node {} in {}, nothing to do." +
- node, node.getAdminState());
- }
- }
-
- /**
- * Stop decommissioning the specified datanode.
- * @param node
- */
- @VisibleForTesting
- public void stopDecommission(DatanodeDescriptor node) {
- if (node.isDecommissionInProgress() || node.isDecommissioned()) {
- // Update DN stats maintained by HeartbeatManager
- hbManager.stopDecommission(node);
- // extra redundancy blocks will be detected and processed when
- // the dead node comes back and send in its full block report.
- if (node.isAlive()) {
- blockManager.processExtraRedundancyBlocksOnInService(node);
- }
- // Remove from tracking in DecommissionManager
- pendingNodes.remove(node);
- outOfServiceNodeBlocks.remove(node);
- } else {
- LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
- node, node.getAdminState());
- }
- }
-
- /**
- * Start maintenance of the specified datanode.
- * @param node
- */
- @VisibleForTesting
- public void startMaintenance(DatanodeDescriptor node,
- long maintenanceExpireTimeInMS) {
- // Even if the node is already in maintenance, we still need to adjust
- // the expiration time.
- node.setMaintenanceExpireTimeInMS(maintenanceExpireTimeInMS);
- if (!node.isMaintenance()) {
- // Update DN stats maintained by HeartbeatManager
- hbManager.startMaintenance(node);
- // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
- if (node.isEnteringMaintenance()) {
- for (DatanodeStorageInfo storage : node.getStorageInfos()) {
- LOG.info("Starting maintenance of {} {} with {} blocks",
- node, storage, storage.numBlocks());
- }
- node.getLeavingServiceStatus().setStartTime(monotonicNow());
- }
- // Track the node regardless whether it is ENTERING_MAINTENANCE or
- // IN_MAINTENANCE to support maintenance expiration.
- pendingNodes.add(node);
- } else {
- LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
- node, node.getAdminState());
- }
- }
-
-
- /**
- * Stop maintenance of the specified datanode.
- * @param node
- */
- @VisibleForTesting
- public void stopMaintenance(DatanodeDescriptor node) {
- if (node.isMaintenance()) {
- // Update DN stats maintained by HeartbeatManager
- hbManager.stopMaintenance(node);
-
- // extra redundancy blocks will be detected and processed when
- // the dead node comes back and send in its full block report.
- if (!node.isAlive()) {
- // The node became dead when it was in maintenance, at which point
- // the replicas weren't removed from block maps.
- // When the node leaves maintenance, the replicas should be removed
- // from the block maps to trigger the necessary replication to
- // maintain the safety property of "# of live replicas + maintenance
- // replicas" >= the expected redundancy.
- blockManager.removeBlocksAssociatedTo(node);
- } else {
- // Even though putting nodes in maintenance node doesn't cause live
- // replicas to match expected replication factor, it is still possible
- // to have over replicated when the node leaves maintenance node.
- // First scenario:
- // a. Node became dead when it is at AdminStates.NORMAL, thus
- // block is replicated so that 3 replicas exist on other nodes.
- // b. Admins put the dead node into maintenance mode and then
- // have the node rejoin the cluster.
- // c. Take the node out of maintenance mode.
- // Second scenario:
- // a. With replication factor 3, set one replica to maintenance node,
- // thus block has 1 maintenance replica and 2 live replicas.
- // b. Change the replication factor to 2. The block will still have
- // 1 maintenance replica and 2 live replicas.
- // c. Take the node out of maintenance mode.
- blockManager.processExtraRedundancyBlocksOnInService(node);
- }
-
- // Remove from tracking in DecommissionManager
- pendingNodes.remove(node);
- outOfServiceNodeBlocks.remove(node);
- } else {
- LOG.trace("stopMaintenance: Node {} in {}, nothing to do." +
- node, node.getAdminState());
- }
- }
-
- private void setDecommissioned(DatanodeDescriptor dn) {
- dn.setDecommissioned();
- LOG.info("Decommissioning complete for node {}", dn);
- }
-
- private void setInMaintenance(DatanodeDescriptor dn) {
- dn.setInMaintenance();
- LOG.info("Node {} has entered maintenance mode.", dn);
- }
-
- /**
- * Checks whether a block is sufficiently replicated/stored for
- * decommissioning. For replicated blocks or striped blocks, full-strength
- * replication or storage is not always necessary, hence "sufficient".
- * @return true if sufficient, else false.
- */
- private boolean isSufficient(BlockInfo block, BlockCollection bc,
- NumberReplicas numberReplicas, boolean isDecommission) {
- if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
- // Block has enough replica, skip
- LOG.trace("Block {} does not need replication.", block);
- return true;
- }
-
- final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
- numberReplicas);
- final int numLive = numberReplicas.liveReplicas();
-
- // Block is under-replicated
- LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
- numLive);
- if (isDecommission && numExpected > numLive) {
- if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
- // Can decom a UC block as long as there will still be minReplicas
- if (blockManager.hasMinStorage(block, numLive)) {
- LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
- + ">= minR ({})", block, numLive,
- blockManager.getMinStorageNum(block));
- return true;
- } else {
- LOG.trace("UC block {} insufficiently-replicated since numLive "
- + "({}) < minR ({})", block, numLive,
- blockManager.getMinStorageNum(block));
- }
- } else {
- // Can decom a non-UC as long as the default replication is met
- if (numLive >= blockManager.getDefaultStorageNum(block)) {
- return true;
- }
- }
- }
- return false;
- }
-
- private void logBlockReplicationInfo(BlockInfo block,
- BlockCollection bc,
- DatanodeDescriptor srcNode, NumberReplicas num,
- Iterable<DatanodeStorageInfo> storages) {
- if (!NameNode.blockStateChangeLog.isInfoEnabled()) {
- return;
- }
-
- int curReplicas = num.liveReplicas();
- int curExpectedRedundancy = blockManager.getExpectedRedundancyNum(block);
- StringBuilder nodeList = new StringBuilder();
- for (DatanodeStorageInfo storage : storages) {
- final DatanodeDescriptor node = storage.getDatanodeDescriptor();
- nodeList.append(node);
- nodeList.append(" ");
- }
- NameNode.blockStateChangeLog.info(
- "Block: " + block + ", Expected Replicas: "
- + curExpectedRedundancy + ", live replicas: " + curReplicas
- + ", corrupt replicas: " + num.corruptReplicas()
- + ", decommissioned replicas: " + num.decommissioned()
- + ", decommissioning replicas: " + num.decommissioning()
- + ", maintenance replicas: " + num.maintenanceReplicas()
- + ", live entering maintenance replicas: "
- + num.liveEnteringMaintenanceReplicas()
- + ", excess replicas: " + num.excessReplicas()
- + ", Is Open File: " + bc.isUnderConstruction()
- + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
- + srcNode + ", Is current datanode decommissioning: "
- + srcNode.isDecommissionInProgress() +
- ", Is current datanode entering maintenance: "
- + srcNode.isEnteringMaintenance());
- }
-
- @VisibleForTesting
- public int getNumPendingNodes() {
- return pendingNodes.size();
- }
-
- @VisibleForTesting
- public int getNumTrackedNodes() {
- return outOfServiceNodeBlocks.size();
- }
-
- @VisibleForTesting
- public int getNumNodesChecked() {
- return monitor.numNodesChecked;
- }
-
- /**
- * Checks to see if DNs have finished decommissioning.
- * <p/>
- * Since this is done while holding the namesystem lock,
- * the amount of work per monitor tick is limited.
- */
- private class Monitor implements Runnable {
- /**
- * The maximum number of blocks to check per tick.
- */
- private final int numBlocksPerCheck;
- /**
- * The maximum number of nodes to track in outOfServiceNodeBlocks.
- * A value of 0 means no limit.
- */
- private final int maxConcurrentTrackedNodes;
- /**
- * The number of blocks that have been checked on this tick.
- */
- private int numBlocksChecked = 0;
- /**
- * The number of blocks checked after (re)holding lock.
- */
- private int numBlocksCheckedPerLock = 0;
- /**
- * The number of nodes that have been checked on this tick. Used for
- * statistics.
- */
- private int numNodesChecked = 0;
- /**
- * The last datanode in outOfServiceNodeBlocks that we've processed
- */
- private DatanodeDescriptor iterkey = new DatanodeDescriptor(new
- DatanodeID("", "", "", 0, 0, 0, 0));
-
- Monitor(int numBlocksPerCheck, int maxConcurrentTrackedNodes) {
- this.numBlocksPerCheck = numBlocksPerCheck;
- this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
- }
-
- private boolean exceededNumBlocksPerCheck() {
- LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
- return numBlocksChecked >= numBlocksPerCheck;
- }
-
- @Override
- public void run() {
- if (!namesystem.isRunning()) {
- LOG.info("Namesystem is not running, skipping decommissioning checks"
- + ".");
- return;
- }
- // Reset the checked count at beginning of each iteration
- numBlocksChecked = 0;
- numBlocksCheckedPerLock = 0;
- numNodesChecked = 0;
- // Check decommission or maintenance progress.
- namesystem.writeLock();
- try {
- processPendingNodes();
- check();
- } finally {
- namesystem.writeUnlock();
- }
- if (numBlocksChecked + numNodesChecked > 0) {
- LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
- numNodesChecked);
- }
- }
-
- /**
- * Pop datanodes off the pending list and into decomNodeBlocks,
- * subject to the maxConcurrentTrackedNodes limit.
- */
- private void processPendingNodes() {
- while (!pendingNodes.isEmpty() &&
- (maxConcurrentTrackedNodes == 0 ||
- outOfServiceNodeBlocks.size() < maxConcurrentTrackedNodes)) {
- outOfServiceNodeBlocks.put(pendingNodes.poll(), null);
- }
- }
-
- private void check() {
- final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>>
- it = new CyclicIteration<>(outOfServiceNodeBlocks,
- iterkey).iterator();
- final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
-
- while (it.hasNext() && !exceededNumBlocksPerCheck() && namesystem
- .isRunning()) {
- numNodesChecked++;
- final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfo>>
- entry = it.next();
- final DatanodeDescriptor dn = entry.getKey();
- AbstractList<BlockInfo> blocks = entry.getValue();
- boolean fullScan = false;
- if (dn.isMaintenance() && dn.maintenanceExpired()) {
- // If maintenance expires, stop tracking it.
- stopMaintenance(dn);
- toRemove.add(dn);
- continue;
- }
- if (dn.isInMaintenance()) {
- // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
- continue;
- }
- if (blocks == null) {
- // This is a newly added datanode, run through its list to schedule
- // under-replicated blocks for replication and collect the blocks
- // that are insufficiently replicated for further tracking
- LOG.debug("Newly-added node {}, doing full scan to find " +
- "insufficiently-replicated blocks.", dn);
- blocks = handleInsufficientlyStored(dn);
- outOfServiceNodeBlocks.put(dn, blocks);
- fullScan = true;
- } else {
- // This is a known datanode, check if its # of insufficiently
- // replicated blocks has dropped to zero and if it can be decommed
- LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
- pruneReliableBlocks(dn, blocks);
- }
- if (blocks.size() == 0) {
- if (!fullScan) {
- // If we didn't just do a full scan, need to re-check with the
- // full block map.
- //
- // We've replicated all the known insufficiently replicated
- // blocks. Re-check with the full block map before finally
- // marking the datanode as decommissioned
- LOG.debug("Node {} has finished replicating current set of "
- + "blocks, checking with the full block map.", dn);
- blocks = handleInsufficientlyStored(dn);
- outOfServiceNodeBlocks.put(dn, blocks);
- }
- // If the full scan is clean AND the node liveness is okay,
- // we can finally mark as decommissioned.
- final boolean isHealthy =
- blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
- if (blocks.size() == 0 && isHealthy) {
- if (dn.isDecommissionInProgress()) {
- setDecommissioned(dn);
- toRemove.add(dn);
- } else if (dn.isEnteringMaintenance()) {
- // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
- // to track maintenance expiration.
- setInMaintenance(dn);
- } else {
- Preconditions.checkState(false,
- "A node is in an invalid state!");
- }
- LOG.debug("Node {} is sufficiently replicated and healthy, "
- + "marked as {}.", dn.getAdminState());
- } else {
- LOG.debug("Node {} {} healthy."
- + " It needs to replicate {} more blocks."
- + " {} is still in progress.", dn,
- isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
- }
- } else {
- LOG.debug("Node {} still has {} blocks to replicate "
- + "before it is a candidate to finish {}.",
- dn, blocks.size(), dn.getAdminState());
- }
- iterkey = dn;
- }
- // Remove the datanodes that are decommissioned or in service after
- // maintenance expiration.
- for (DatanodeDescriptor dn : toRemove) {
- Preconditions.checkState(dn.isDecommissioned() || dn.isInService(),
- "Removing a node that is not yet decommissioned or in service!");
- outOfServiceNodeBlocks.remove(dn);
- }
- }
-
- /**
- * Removes reliable blocks from the block list of a datanode.
- */
- private void pruneReliableBlocks(final DatanodeDescriptor datanode,
- AbstractList<BlockInfo> blocks) {
- processBlocksInternal(datanode, blocks.iterator(), null, true);
- }
-
- /**
- * Returns a list of blocks on a datanode that are insufficiently replicated
- * or require recovery, i.e. requiring recovery and should prevent
- * decommission.
- * <p/>
- * As part of this, it also schedules replication/recovery work.
- *
- * @return List of blocks requiring recovery
- */
- private AbstractList<BlockInfo> handleInsufficientlyStored(
- final DatanodeDescriptor datanode) {
- AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
- processBlocksInternal(datanode, datanode.getBlockIterator(),
- insufficient, false);
- return insufficient;
- }
-
- /**
- * Used while checking if decommission-in-progress datanodes can be marked
- * as decommissioned. Combines shared logic of
- * pruneReliableBlocks and handleInsufficientlyStored.
- *
- * @param datanode Datanode
- * @param it Iterator over the blocks on the
- * datanode
- * @param insufficientList Return parameter. If it's not null,
- * will contain the insufficiently
- * replicated-blocks from the list.
- * @param pruneReliableBlocks whether to remove blocks reliable
- * enough from the iterator
- */
- private void processBlocksInternal(
- final DatanodeDescriptor datanode,
- final Iterator<BlockInfo> it,
- final List<BlockInfo> insufficientList,
- boolean pruneReliableBlocks) {
- boolean firstReplicationLog = true;
- // Low redundancy in UC Blocks only
- int lowRedundancyInOpenFiles = 0;
- // All low redundancy blocks. Includes lowRedundancyInOpenFiles.
- int lowRedundancyBlocks = 0;
- // All maintenance and decommission replicas.
- int outOfServiceOnlyReplicas = 0;
- while (it.hasNext()) {
- if (insufficientList == null
- && numBlocksCheckedPerLock >= numBlocksPerCheck) {
- // During fullscan insufficientlyReplicated will NOT be null, iterator
- // will be DN's iterator. So should not yield lock, otherwise
- // ConcurrentModificationException could occur.
- // Once the fullscan done, iterator will be a copy. So can yield the
- // lock.
- // Yielding is required in case of block number is greater than the
- // configured per-iteration-limit.
- namesystem.writeUnlock();
- try {
- LOG.debug("Yielded lock during decommission check");
- Thread.sleep(0, 500);
- } catch (InterruptedException ignored) {
- return;
- }
- // reset
- numBlocksCheckedPerLock = 0;
- namesystem.writeLock();
- }
- numBlocksChecked++;
- numBlocksCheckedPerLock++;
- final BlockInfo block = it.next();
- // Remove the block from the list if it's no longer in the block map,
- // e.g. the containing file has been deleted
- if (blockManager.blocksMap.getStoredBlock(block) == null) {
- LOG.trace("Removing unknown block {}", block);
- it.remove();
- continue;
- }
-
- long bcId = block.getBlockCollectionId();
- if (bcId == INodeId.INVALID_INODE_ID) {
- // Orphan block, will be invalidated eventually. Skip.
- continue;
- }
-
- final BlockCollection bc = blockManager.getBlockCollection(block);
- final NumberReplicas num = blockManager.countNodes(block);
- final int liveReplicas = num.liveReplicas();
-
- // Schedule low redundancy blocks for reconstruction if not already
- // pending
- boolean isDecommission = datanode.isDecommissionInProgress();
- boolean neededReconstruction = isDecommission ?
- blockManager.isNeededReconstruction(block, num) :
- blockManager.isNeededReconstructionForMaintenance(block, num);
- if (neededReconstruction) {
- if (!blockManager.neededReconstruction.contains(block) &&
- blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
- blockManager.isPopulatingReplQueues()) {
- // Process these blocks only when active NN is out of safe mode.
- blockManager.neededReconstruction.add(block,
- liveReplicas, num.readOnlyReplicas(),
- num.outOfServiceReplicas(),
- blockManager.getExpectedRedundancyNum(block));
- }
- }
-
- // Even if the block is without sufficient redundancy,
- // it doesn't block decommission if has sufficient redundancy
- if (isSufficient(block, bc, num, isDecommission)) {
- if (pruneReliableBlocks) {
- it.remove();
- }
- continue;
- }
-
- // We've found a block without sufficient redundancy.
- if (insufficientList != null) {
- insufficientList.add(block);
- }
- // Log if this is our first time through
- if (firstReplicationLog) {
- logBlockReplicationInfo(block, bc, datanode, num,
- blockManager.blocksMap.getStorages(block));
- firstReplicationLog = false;
- }
- // Update various counts
- lowRedundancyBlocks++;
- if (bc.isUnderConstruction()) {
- lowRedundancyInOpenFiles++;
- }
- if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
- outOfServiceOnlyReplicas++;
- }
- }
-
- datanode.getLeavingServiceStatus().set(lowRedundancyInOpenFiles,
- lowRedundancyBlocks, outOfServiceOnlyReplicas);
- }
- }
-
- @VisibleForTesting
- void runMonitorForTest() throws ExecutionException, InterruptedException {
- executor.submit(monitor).get();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index b859148..318d8e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -471,7 +471,7 @@ public class BackupNode extends NameNode {
* {@link LeaseManager.Monitor} protected by SafeMode.
* {@link BlockManager.RedundancyMonitor} protected by SafeMode.
* {@link HeartbeatManager.Monitor} protected by SafeMode.
- * {@link DecommissionManager.Monitor} need to prohibit refreshNodes().
+ * {@link DatanodeAdminManager.Monitor} need to prohibit refreshNodes().
* {@link PendingReconstructionBlocks.PendingReconstructionMonitor}
* harmless, because RedundancyMonitor is muted.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 4caee9e..8bf2b8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -960,17 +960,17 @@
<property>
<name>dfs.namenode.decommission.interval</name>
<value>30s</value>
- <description>Namenode periodicity in seconds to check if decommission is
- complete. Support multiple time unit suffix(case insensitive), as described
- in dfs.heartbeat.interval.
+ <description>Namenode periodicity in seconds to check if
+ decommission or maintenance is complete. Support multiple time unit
+ suffix(case insensitive), as described in dfs.heartbeat.interval.
</description>
</property>
<property>
<name>dfs.namenode.decommission.blocks.per.interval</name>
<value>500000</value>
- <description>The approximate number of blocks to process per
- decommission interval, as defined in dfs.namenode.decommission.interval.
+ <description>The approximate number of blocks to process per decommission
+ or maintenance interval, as defined in dfs.namenode.decommission.interval.
</description>
</property>
@@ -978,11 +978,12 @@
<name>dfs.namenode.decommission.max.concurrent.tracked.nodes</name>
<value>100</value>
<description>
- The maximum number of decommission-in-progress datanodes nodes that will be
- tracked at one time by the namenode. Tracking a decommission-in-progress
- datanode consumes additional NN memory proportional to the number of blocks
- on the datnode. Having a conservative limit reduces the potential impact
- of decomissioning a large number of nodes at once.
+ The maximum number of decommission-in-progress or
+ entering-maintenance datanodes nodes that will be tracked at one time by
+ the namenode. Tracking these datanode consumes additional NN memory
+ proportional to the number of blocks on the datnode. Having a conservative
+ limit reduces the potential impact of decommissioning or maintenance of
+ a large number of nodes at once.
A value of 0 means no limit will be enforced.
</description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
index c2c6be1..ac14a2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@@ -256,9 +256,10 @@ public class TestDecommission extends AdminStatesBaseTest {
startSimpleHACluster(3);
- // Step 1, create a cluster with 4 DNs. Blocks are stored on the first 3 DNs.
- // The last DN is empty. Also configure the last DN to have slow heartbeat
- // so that it will be chosen as excess replica candidate during recommission.
+ // Step 1, create a cluster with 4 DNs. Blocks are stored on the
+ // first 3 DNs. The last DN is empty. Also configure the last DN to have
+ // slow heartbeat so that it will be chosen as excess replica candidate
+ // during recommission.
// Step 1.a, copy blocks to the first 3 DNs. Given the replica count is the
// same as # of DNs, each DN will have a replica for any block.
@@ -290,9 +291,9 @@ public class TestDecommission extends AdminStatesBaseTest {
// Step 3, recommission the first DN on SBN and ANN to create excess replica
// It recommissions the node on SBN first to create potential
- // inconsistent state. In production cluster, such insistent state can happen
- // even if recommission command was issued on ANN first given the async nature
- // of the system.
+ // inconsistent state. In production cluster, such insistent state can
+ // happen even if recommission command was issued on ANN first given the
+ // async nature of the system.
// Step 3.a, ask SBN to recomm the first DN.
// SBN has been fixed so that it no longer invalidates excess replica during
@@ -301,10 +302,10 @@ public class TestDecommission extends AdminStatesBaseTest {
// 1. the last DN would have been chosen as excess replica, given its
// heartbeat is considered old.
// Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
- // 2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
+ // 2. After recommissionNode finishes, SBN has 3 live replicas (0, 1, 2)
// and one excess replica ( 3 )
// After the fix,
- // After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
+ // After recommissionNode finishes, SBN has 4 live replicas (0, 1, 2, 3)
Thread.sleep(slowHeartbeatDNwaitTime);
putNodeInService(1, decomNodeFromSBN);
@@ -561,7 +562,8 @@ public class TestDecommission extends AdminStatesBaseTest {
* federated cluster.
*/
@Test(timeout=360000)
- public void testHostsFileFederation() throws IOException, InterruptedException {
+ public void testHostsFileFederation()
+ throws IOException, InterruptedException {
// Test for 3 namenode federated cluster
testHostsFile(3);
}
@@ -598,7 +600,8 @@ public class TestDecommission extends AdminStatesBaseTest {
}
@Test(timeout=120000)
- public void testDecommissionWithOpenfile() throws IOException, InterruptedException {
+ public void testDecommissionWithOpenfile()
+ throws IOException, InterruptedException {
LOG.info("Starting test testDecommissionWithOpenfile");
//At most 4 nodes will be decommissioned
@@ -742,14 +745,15 @@ public class TestDecommission extends AdminStatesBaseTest {
// make sure the two datanodes remain in decomm in progress state
BlockManagerTestUtil.recheckDecommissionState(dm);
- assertTrackedAndPending(dm.getDecomManager(), 2, 0);
+ assertTrackedAndPending(dm.getDatanodeAdminManager(), 2, 0);
}
/**
* Tests restart of namenode while datanode hosts are added to exclude file
**/
@Test(timeout=360000)
- public void testDecommissionWithNamenodeRestart()throws IOException, InterruptedException {
+ public void testDecommissionWithNamenodeRestart()
+ throws IOException, InterruptedException {
LOG.info("Starting test testDecommissionWithNamenodeRestart");
int numNamenodes = 1;
int numDatanodes = 1;
@@ -914,7 +918,7 @@ public class TestDecommission extends AdminStatesBaseTest {
@Test(timeout=120000)
public void testBlocksPerInterval() throws Exception {
- org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+ org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
.setLevel(Level.TRACE);
// Turn the blocks per interval way down
getConf().setInt(
@@ -927,7 +931,8 @@ public class TestDecommission extends AdminStatesBaseTest {
final FileSystem fs = getCluster().getFileSystem();
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
- final DecommissionManager decomManager = datanodeManager.getDecomManager();
+ final DatanodeAdminManager decomManager =
+ datanodeManager.getDatanodeAdminManager();
// Write a 3 block file, so each node has one block. Should scan 3 nodes.
DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
@@ -944,7 +949,7 @@ public class TestDecommission extends AdminStatesBaseTest {
}
private void doDecomCheck(DatanodeManager datanodeManager,
- DecommissionManager decomManager, int expectedNumCheckedNodes)
+ DatanodeAdminManager decomManager, int expectedNumCheckedNodes)
throws IOException, ExecutionException, InterruptedException {
// Decom all nodes
ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
@@ -965,7 +970,7 @@ public class TestDecommission extends AdminStatesBaseTest {
@Test(timeout=120000)
public void testPendingNodes() throws Exception {
- org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+ org.apache.log4j.Logger.getLogger(DatanodeAdminManager.class)
.setLevel(Level.TRACE);
// Only allow one node to be decom'd at a time
getConf().setInt(
@@ -978,7 +983,8 @@ public class TestDecommission extends AdminStatesBaseTest {
final FileSystem fs = getCluster().getFileSystem();
final DatanodeManager datanodeManager =
getCluster().getNamesystem().getBlockManager().getDatanodeManager();
- final DecommissionManager decomManager = datanodeManager.getDecomManager();
+ final DatanodeAdminManager decomManager =
+ datanodeManager.getDatanodeAdminManager();
// Keep a file open to prevent decom from progressing
HdfsDataOutputStream open1 =
@@ -1014,7 +1020,7 @@ public class TestDecommission extends AdminStatesBaseTest {
assertTrackedAndPending(decomManager, 1, 0);
}
- private void assertTrackedAndPending(DecommissionManager decomManager,
+ private void assertTrackedAndPending(DatanodeAdminManager decomManager,
int tracked, int pending) {
assertEquals("Unexpected number of tracked nodes", tracked,
decomManager.getNumTrackedNodes());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
index 77e2ffb..7ee766f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
@@ -328,7 +328,7 @@ public class BlockManagerTestUtil {
*/
public static void recheckDecommissionState(DatanodeManager dm)
throws ExecutionException, InterruptedException {
- dm.getDecomManager().runMonitorForTest();
+ dm.getDatanodeAdminManager().runMonitorForTest();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
index 4ecfd50..aaa4899 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReconstructStripedBlocksWithRackAwareness.java
@@ -330,8 +330,9 @@ public class TestReconstructStripedBlocksWithRackAwareness {
// start decommissioning h9
boolean satisfied = bm.isPlacementPolicySatisfied(blockInfo);
Assert.assertFalse(satisfied);
- final DecommissionManager decomManager =
- (DecommissionManager) Whitebox.getInternalState(dm, "decomManager");
+ final DatanodeAdminManager decomManager =
+ (DatanodeAdminManager) Whitebox.getInternalState(
+ dm, "datanodeAdminManager");
cluster.getNamesystem().writeLock();
try {
dn9.stopDecommission();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index bcd8245..fef0b45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -100,7 +100,7 @@ public class TestReplicationPolicyConsiderLoad
// returns false
for (int i = 0; i < 3; i++) {
DatanodeDescriptor d = dataNodes[i];
- dnManager.getDecomManager().startDecommission(d);
+ dnManager.getDatanodeAdminManager().startDecommission(d);
d.setDecommissioned();
}
assertEquals((double)load/3, dnManager.getFSClusterStats()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 11d7431..cfebff7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeAdminManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@@ -100,7 +100,7 @@ public class TestDecommissioningStatus {
fileSys = cluster.getFileSystem();
cluster.getNamesystem().getBlockManager().getDatanodeManager()
.setHeartbeatExpireInterval(3000);
- Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
+ Logger.getLogger(DatanodeAdminManager.class).setLevel(Level.DEBUG);
LOG = Logger.getLogger(TestDecommissioningStatus.class);
}
@@ -344,7 +344,7 @@ public class TestDecommissioningStatus {
*/
@Test(timeout=120000)
public void testDecommissionDeadDN() throws Exception {
- Logger log = Logger.getLogger(DecommissionManager.class);
+ Logger log = Logger.getLogger(DatanodeAdminManager.class);
log.setLevel(Level.DEBUG);
DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
String dnName = dnID.getXferAddr();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/79df1e75/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
index eab1199..205593f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDefaultBlockPlacementPolicy.java
@@ -156,11 +156,11 @@ public class TestDefaultBlockPlacementPolicy {
DatanodeDescriptor dnd3 = dnm.getDatanode(
cluster.getDataNodes().get(3).getDatanodeId());
assertEquals(dnd3.getNetworkLocation(), clientRack);
- dnm.getDecomManager().startDecommission(dnd3);
+ dnm.getDatanodeAdminManager().startDecommission(dnd3);
try {
testPlacement(clientMachine, clientRack, false);
} finally {
- dnm.getDecomManager().stopDecommission(dnd3);
+ dnm.getDatanodeAdminManager().stopDecommission(dnd3);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org