You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ar...@apache.org on 2016/03/09 18:45:18 UTC
[18/34] hadoop git commit: HDFS-8786. Erasure coding: use simple
replication for internal blocks on decommissioning datanodes. Contributed by
Rakesh R.
HDFS-8786. Erasure coding: use simple replication for internal blocks on decommissioning datanodes. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/743a99f2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/743a99f2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/743a99f2
Branch: refs/heads/HDFS-1312
Commit: 743a99f2dbc9a27e19f92ff3551937d90dba2e89
Parents: f86850b
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Mar 8 10:24:57 2016 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Mar 8 10:24:57 2016 -0800
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 14 +-
.../BlockReconstructionWork.java | 19 +-
.../blockmanagement/ErasureCodingWork.java | 67 ++-
.../server/blockmanagement/ReplicationWork.java | 2 +-
.../hdfs/TestDecommissionWithStriped.java | 473 +++++++++++++++++++
5 files changed, 547 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/743a99f2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4123654..f12ea1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1610,7 +1610,7 @@ public class BlockManager implements BlockStatsMXBean {
return null;
}
- final int additionalReplRequired;
+ int additionalReplRequired;
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication - numReplicas.liveReplicas()
- pendingNum;
@@ -1624,6 +1624,13 @@ public class BlockManager implements BlockStatsMXBean {
// Wait the previous reconstruction to finish.
return null;
}
+
+ // should reconstruct all the internal blocks before scheduling
+ // replication task for decommissioning node(s).
+ if (additionalReplRequired - numReplicas.decommissioning() > 0) {
+ additionalReplRequired = additionalReplRequired
+ - numReplicas.decommissioning();
+ }
byte[] indices = new byte[liveBlockIndices.size()];
for (int i = 0 ; i < liveBlockIndices.size(); i++) {
indices[i] = liveBlockIndices.get(i);
@@ -1679,10 +1686,13 @@ public class BlockManager implements BlockStatsMXBean {
// No use continuing, unless a new rack in this case
return false;
}
+ // mark that the reconstruction work is to replicate internal block to a
+ // new rack.
+ rw.setNotEnoughRack();
}
// Add block to the datanode's task list
- rw.addTaskToDatanode();
+ rw.addTaskToDatanode(numReplicas);
DatanodeStorageInfo.incrementBlocksScheduled(targets);
// Move the block-replication into a "pending" state.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/743a99f2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
index c1998ee..57121bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReconstructionWork.java
@@ -47,6 +47,7 @@ abstract class BlockReconstructionWork {
private DatanodeStorageInfo[] targets;
private final int priority;
+ private boolean notEnoughRack = false;
public BlockReconstructionWork(BlockInfo block,
BlockCollection bc,
@@ -105,12 +106,26 @@ abstract class BlockReconstructionWork {
return additionalReplRequired;
}
+ /**
+ * Mark that the reconstruction work is to replicate internal block to a new
+ * rack.
+ */
+ void setNotEnoughRack() {
+ notEnoughRack = true;
+ }
+
+ boolean hasNotEnoughRack() {
+ return notEnoughRack;
+ }
+
abstract void chooseTargets(BlockPlacementPolicy blockplacement,
BlockStoragePolicySuite storagePolicySuite,
Set<Node> excludedNodes);
/**
- * add reconstruction task into a source datanode
+ * Add reconstruction task into a source datanode.
+ *
+ * @param numberReplicas replica details
*/
- abstract void addTaskToDatanode();
+ abstract void addTaskToDatanode(NumberReplicas numberReplicas);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/743a99f2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
index 7877c56..d110b30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.net.Node;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.List;
@@ -121,33 +120,55 @@ class ErasureCodingWork extends BlockReconstructionWork {
}
@Override
- void addTaskToDatanode() {
- assert getTargets().length > 0;
+ void addTaskToDatanode(NumberReplicas numberReplicas) {
+ final DatanodeStorageInfo[] targets = getTargets();
+ assert targets.length > 0;
BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
- // if we already have all the internal blocks, but not enough racks,
- // we only need to replicate one internal block to a new rack
- if (hasAllInternalBlocks()) {
+ if (hasNotEnoughRack()) {
+ // if we already have all the internal blocks, but not enough racks,
+ // we only need to replicate one internal block to a new rack
int sourceIndex = chooseSource4SimpleReplication();
- final byte blockIndex = liveBlockIndicies[sourceIndex];
- final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
- final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
- stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
- stripedBlk.getDataBlockNum(), blockIndex);
- final Block targetBlk = new Block(
- stripedBlk.getBlockId() + blockIndex, internBlkLen,
- stripedBlk.getGenerationStamp());
- source.addBlockToBeReplicated(targetBlk, getTargets());
- if (BlockManager.LOG.isDebugEnabled()) {
- BlockManager.LOG.debug("Add replication task from source {} to " +
- "targets {} for EC block {}", source, Arrays.toString(getTargets()),
- targetBlk);
+ createReplicationWork(sourceIndex, targets[0]);
+ } else if (numberReplicas.decommissioning() > 0 && hasAllInternalBlocks()) {
+ List<Integer> decommissioningSources = findDecommissioningSources();
+ // decommissioningSources.size() should be >= targets.length
+ final int num = Math.min(decommissioningSources.size(), targets.length);
+ for (int i = 0; i < num; i++) {
+ createReplicationWork(decommissioningSources.get(i), targets[i]);
}
} else {
- getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
- new ExtendedBlock(blockPoolId, stripedBlk),
- getSrcNodes(), getTargets(), getLiveBlockIndicies(),
- stripedBlk.getErasureCodingPolicy());
+ targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
+ new ExtendedBlock(blockPoolId, stripedBlk), getSrcNodes(), targets,
+ getLiveBlockIndicies(), stripedBlk.getErasureCodingPolicy());
}
}
+
+ private void createReplicationWork(int sourceIndex,
+ DatanodeStorageInfo target) {
+ BlockInfoStriped stripedBlk = (BlockInfoStriped) getBlock();
+ final byte blockIndex = liveBlockIndicies[sourceIndex];
+ final DatanodeDescriptor source = getSrcNodes()[sourceIndex];
+ final long internBlkLen = StripedBlockUtil.getInternalBlockLength(
+ stripedBlk.getNumBytes(), stripedBlk.getCellSize(),
+ stripedBlk.getDataBlockNum(), blockIndex);
+ final Block targetBlk = new Block(stripedBlk.getBlockId() + blockIndex,
+ internBlkLen, stripedBlk.getGenerationStamp());
+ source.addBlockToBeReplicated(targetBlk,
+ new DatanodeStorageInfo[] {target});
+ if (BlockManager.LOG.isDebugEnabled()) {
+ BlockManager.LOG.debug("Add replication task from source {} to "
+ + "target {} for EC block {}", source, target, targetBlk);
+ }
+ }
+
+ private List<Integer> findDecommissioningSources() {
+ List<Integer> srcIndices = new ArrayList<>();
+ for (int i = 0; i < getSrcNodes().length; i++) {
+ if (getSrcNodes()[i].isDecommissionInProgress()) {
+ srcIndices.add(i);
+ }
+ }
+ return srcIndices;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/743a99f2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
index 24601a2..f4d274a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java
@@ -55,7 +55,7 @@ class ReplicationWork extends BlockReconstructionWork {
}
@Override
- void addTaskToDatanode() {
+ void addTaskToDatanode(NumberReplicas numberReplicas) {
getSrcNodes()[0].addBlockToBeReplicated(getBlock(), getTargets());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/743a99f2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
new file mode 100644
index 0000000..bde2ceb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_DATA_BLOCKS;
+import static org.apache.hadoop.hdfs.StripedFileTestUtil.NUM_PARITY_BLOCKS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class tests the decommissioning of datanode with striped blocks.
+ */
+public class TestDecommissionWithStriped {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestDecommissionWithStriped.class);
+
+ // heartbeat interval in seconds
+ private static final int HEARTBEAT_INTERVAL = 1;
+ // block report in msec
+ private static final int BLOCKREPORT_INTERVAL_MSEC = 1000;
+ // replication interval
+ private static final int NAMENODE_REPLICATION_INTERVAL = 1;
+
+ private Path decommissionDir;
+ private Path hostsFile;
+ private Path excludeFile;
+ private FileSystem localFileSys;
+
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem dfs;
+ private int numDNs;
+ private final int blockSize = StripedFileTestUtil.blockSize;
+ private final int cellSize = StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE;
+ private int dataBlocks = StripedFileTestUtil.NUM_DATA_BLOCKS;
+ private final Path ecDir = new Path("/" + this.getClass().getSimpleName());
+
+ private FSNamesystem fsn;
+ private BlockManager bm;
+ private DFSClient client;
+
+ @Before
+ public void setup() throws IOException {
+ conf = new HdfsConfiguration();
+
+ // Set up the hosts/exclude files.
+ localFileSys = FileSystem.getLocal(conf);
+ Path workingDir = localFileSys.getWorkingDirectory();
+ decommissionDir = new Path(workingDir,
+ PathUtils.getTestDirName(getClass()) + "/work-dir/decommission");
+ hostsFile = new Path(decommissionDir, "hosts");
+ excludeFile = new Path(decommissionDir, "exclude");
+ writeConfigFile(hostsFile, null);
+ writeConfigFile(excludeFile, null);
+
+ // Setup conf
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+ false);
+ conf.set(DFSConfigKeys.DFS_HOSTS, hostsFile.toUri().getPath());
+ conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
+ 2000);
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+ BLOCKREPORT_INTERVAL_MSEC);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
+ 4);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
+ NAMENODE_REPLICATION_INTERVAL);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+ conf.setInt(
+ DFSConfigKeys.DFS_DN_EC_RECONSTRUCTION_STRIPED_READ_BUFFER_SIZE_KEY,
+ StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE - 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY,
+ false);
+
+ numDNs = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS + 2;
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster.waitActive();
+ dfs = cluster.getFileSystem(0);
+ fsn = cluster.getNamesystem();
+ bm = fsn.getBlockManager();
+ client = getDfsClient(cluster.getNameNode(0), conf);
+
+ dfs.mkdirs(ecDir);
+ dfs.setErasureCodingPolicy(ecDir, null);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ cleanupFile(localFileSys, decommissionDir);
+ if (cluster != null) {
+ cluster.shutdown();
+ cluster = null;
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testFileFullBlockGroup() throws Exception {
+ LOG.info("Starting test testFileFullBlockGroup");
+ testDecommission(blockSize * dataBlocks, 9, 1, "testFileFullBlockGroup");
+ }
+
+ @Test(timeout = 120000)
+ public void testFileSmallerThanOneCell() throws Exception {
+ LOG.info("Starting test testFileSmallerThanOneCell");
+ testDecommission(cellSize - 1, 4, 1, "testFileSmallerThanOneCell");
+ }
+
+ @Test(timeout = 120000)
+ public void testFileSmallerThanOneStripe() throws Exception {
+ LOG.info("Starting test testFileSmallerThanOneStripe");
+ testDecommission(cellSize * 2, 5, 1, "testFileSmallerThanOneStripe");
+ }
+
+ @Test(timeout = 120000)
+ public void testDecommissionTwoNodes() throws Exception {
+ LOG.info("Starting test testDecommissionTwoNodes");
+ testDecommission(blockSize * dataBlocks, 9, 2, "testDecommissionTwoNodes");
+ }
+
+ @Test(timeout = 120000)
+ public void testDecommissionWithURBlockForSameBlockGroup() throws Exception {
+ LOG.info("Starting test testDecommissionWithURBlocksForSameBlockGroup");
+
+ final Path ecFile = new Path(ecDir, "testDecommissionWithCorruptBlocks");
+ int writeBytes = BLOCK_STRIPED_CELL_SIZE * NUM_DATA_BLOCKS * 2;
+ writeStripedFile(dfs, ecFile, writeBytes);
+ Assert.assertEquals(0, bm.numOfUnderReplicatedBlocks());
+
+ final List<DatanodeInfo> decommisionNodes = new ArrayList<DatanodeInfo>();
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(ecFile.toString(), 0)
+ .get(0);
+ DatanodeInfo[] dnLocs = lb.getLocations();
+ assertEquals(NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS, dnLocs.length);
+ int decommNodeIndex = NUM_DATA_BLOCKS - 1;
+ int stopNodeIndex = 1;
+
+ // add the nodes which will be decommissioning
+ decommisionNodes.add(dnLocs[decommNodeIndex]);
+
+ // stop excess dns to avoid immediate reconstruction.
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+ List<DataNodeProperties> stoppedDns = new ArrayList<>();
+ for (DatanodeInfo liveDn : info) {
+ boolean usedNode = false;
+ for (DatanodeInfo datanodeInfo : dnLocs) {
+ if (liveDn.getXferAddr().equals(datanodeInfo.getXferAddr())) {
+ usedNode = true;
+ break;
+ }
+ }
+ if (!usedNode) {
+ DataNode dn = cluster.getDataNode(liveDn.getIpcPort());
+ stoppedDns.add(cluster.stopDataNode(liveDn.getXferAddr()));
+ cluster.setDataNodeDead(dn.getDatanodeId());
+ LOG.info("stop datanode " + dn.getDatanodeId().getHostName());
+ }
+ }
+ DataNode dn = cluster.getDataNode(dnLocs[stopNodeIndex].getIpcPort());
+ cluster.stopDataNode(dnLocs[stopNodeIndex].getXferAddr());
+ cluster.setDataNodeDead(dn.getDatanodeId());
+ numDNs = numDNs - 1;
+
+ // Decommission node in a new thread. Verify that node is decommissioned.
+ final CountDownLatch decomStarted = new CountDownLatch(0);
+ Thread decomTh = new Thread() {
+ public void run() {
+ try {
+ decomStarted.countDown();
+ decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+ } catch (Exception e) {
+ LOG.error("Exception while decommissioning", e);
+ Assert.fail("Shouldn't throw exception!");
+ }
+ };
+ };
+ int deadDecomissioned = fsn.getNumDecomDeadDataNodes();
+ int liveDecomissioned = fsn.getNumDecomLiveDataNodes();
+ decomTh.start();
+ decomStarted.await(5, TimeUnit.SECONDS);
+ Thread.sleep(3000); // grace period to trigger decommissioning call
+ // start datanode so that decommissioning live node will be finished
+ for (DataNodeProperties dnp : stoppedDns) {
+ cluster.restartDataNode(dnp, true);
+ LOG.info("Restarts stopped datanode:{} to trigger block reconstruction",
+ dnp.datanode);
+ }
+ cluster.waitActive();
+
+ LOG.info("Waiting to finish decommissioning node:{}", decommisionNodes);
+ decomTh.join(20000); // waiting 20secs to finish decommission
+ LOG.info("Finished decommissioning node:{}", decommisionNodes);
+
+ assertEquals(deadDecomissioned, fsn.getNumDecomDeadDataNodes());
+ assertEquals(liveDecomissioned + decommisionNodes.size(),
+ fsn.getNumDecomLiveDataNodes());
+
+ // Ensure decommissioned datanode is not automatically shutdown
+ DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
+ assertEquals("All datanodes must be alive", numDNs,
+ client.datanodeReport(DatanodeReportType.LIVE).length);
+
+ assertNull(checkFile(dfs, ecFile, 9, decommisionNodes, numDNs));
+ StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
+ null);
+ cleanupFile(dfs, ecFile);
+ }
+
+ private void testDecommission(int writeBytes, int storageCount,
+ int decomNodeCount, String filename) throws IOException, Exception {
+ Path ecFile = new Path(ecDir, filename);
+ writeStripedFile(dfs, ecFile, writeBytes);
+ List<DatanodeInfo> decommisionNodes = getDecommissionDatanode(dfs, ecFile,
+ writeBytes, decomNodeCount);
+
+ int deadDecomissioned = fsn.getNumDecomDeadDataNodes();
+ int liveDecomissioned = fsn.getNumDecomLiveDataNodes();
+ ((HdfsDataInputStream) dfs.open(ecFile)).getAllBlocks();
+ // Decommission node. Verify that node is decommissioned.
+ decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
+
+ assertEquals(deadDecomissioned, fsn.getNumDecomDeadDataNodes());
+ assertEquals(liveDecomissioned + decommisionNodes.size(),
+ fsn.getNumDecomLiveDataNodes());
+
+ // Ensure decommissioned datanode is not automatically shutdown
+ DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
+ assertEquals("All datanodes must be alive", numDNs,
+ client.datanodeReport(DatanodeReportType.LIVE).length);
+
+ assertNull(checkFile(dfs, ecFile, storageCount, decommisionNodes, numDNs));
+ StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
+ null);
+ cleanupFile(dfs, ecFile);
+ }
+
+ private List<DatanodeInfo> getDecommissionDatanode(DistributedFileSystem dfs,
+ Path ecFile, int writeBytes, int decomNodeCount) throws IOException {
+ ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+ BlockLocation[] fileBlockLocations = dfs.getFileBlockLocations(ecFile, 0,
+ writeBytes);
+ for (String dnName : fileBlockLocations[0].getNames()) {
+ for (DatanodeInfo dn : info) {
+ if (dnName.equals(dn.getXferAddr())) {
+ decommissionedNodes.add(dn);
+ }
+ if (decommissionedNodes.size() >= decomNodeCount) {
+ return decommissionedNodes;
+ }
+ }
+ }
+ return decommissionedNodes;
+ }
+
+ /* Get DFSClient to the namenode */
+ private static DFSClient getDfsClient(NameNode nn, Configuration conf)
+ throws IOException {
+ return new DFSClient(nn.getNameNodeAddress(), conf);
+ }
+
+ private void writeStripedFile(DistributedFileSystem dfs, Path ecFile,
+ int writeBytes) throws IOException, Exception {
+ byte[] bytes = StripedFileTestUtil.generateBytes(writeBytes);
+ DFSTestUtil.writeFile(dfs, ecFile, new String(bytes));
+ StripedFileTestUtil.waitBlockGroupsReported(dfs, ecFile.toString());
+
+ StripedFileTestUtil.checkData(dfs, ecFile, writeBytes,
+ new ArrayList<DatanodeInfo>(), null);
+ }
+
+ private void writeConfigFile(Path name, List<String> nodes)
+ throws IOException {
+ // delete if it already exists
+ if (localFileSys.exists(name)) {
+ localFileSys.delete(name, true);
+ }
+
+ FSDataOutputStream stm = localFileSys.create(name);
+
+ if (nodes != null) {
+ for (Iterator<String> it = nodes.iterator(); it.hasNext();) {
+ String node = it.next();
+ stm.writeBytes(node);
+ stm.writeBytes("\n");
+ }
+ }
+ stm.close();
+ }
+
+ private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+ assertTrue(fileSys.exists(name));
+ fileSys.delete(name, true);
+ assertTrue(!fileSys.exists(name));
+ }
+
+ /*
+ * decommission the DN at index dnIndex or one random node if dnIndex is set
+ * to -1 and wait for the node to reach the given {@code waitForState}.
+ */
+ private void decommissionNode(int nnIndex,
+ List<DatanodeInfo> decommissionedNodes, AdminStates waitForState)
+ throws IOException {
+ DFSClient client = getDfsClient(cluster.getNameNode(nnIndex), conf);
+ DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
+
+ // write nodename into the exclude file.
+ ArrayList<String> excludeNodes = new ArrayList<String>();
+ for (DatanodeInfo dn : decommissionedNodes) {
+ boolean nodeExists = false;
+ for (DatanodeInfo dninfo : info) {
+ if (dninfo.getDatanodeUuid().equals(dn.getDatanodeUuid())) {
+ nodeExists = true;
+ break;
+ }
+ }
+ assertTrue("Datanode: " + dn + " is not LIVE", nodeExists);
+ excludeNodes.add(dn.getName());
+ LOG.info("Decommissioning node: " + dn.getName());
+ }
+ writeConfigFile(excludeFile, excludeNodes);
+ refreshNodes(cluster.getNamesystem(nnIndex), conf);
+ for (DatanodeInfo dn : decommissionedNodes) {
+ DatanodeInfo ret = NameNodeAdapter
+ .getDatanode(cluster.getNamesystem(nnIndex), dn);
+ waitNodeState(ret, waitForState);
+ }
+ }
+
+ private static void refreshNodes(final FSNamesystem ns,
+ final Configuration conf) throws IOException {
+ ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
+ }
+
+ /*
+ * Wait till node is fully decommissioned.
+ */
+ private void waitNodeState(DatanodeInfo node, AdminStates state) {
+ boolean done = state == node.getAdminState();
+ while (!done) {
+ LOG.info("Waiting for node " + node + " to change state to " + state
+ + " current state: " + node.getAdminState());
+ try {
+ Thread.sleep(HEARTBEAT_INTERVAL * 500);
+ } catch (InterruptedException e) {
+ // nothing
+ }
+ done = state == node.getAdminState();
+ }
+ LOG.info("node " + node + " reached the state " + state);
+ }
+
+ /**
+ * Verify that the number of replicas are as expected for each block in the
+ * given file. For blocks with a decommissioned node, verify that their
+ * replication is 1 more than what is specified. For blocks without
+ * decommissioned nodes, verify their replication is equal to what is
+ * specified.
+ *
+ * @param downnode
+ * - if null, there is no decommissioned node for this file.
+ * @return - null if no failure found, else an error message string.
+ */
+ private static String checkFile(FileSystem fileSys, Path name, int repl,
+ List<DatanodeInfo> decommissionedNodes, int numDatanodes)
+ throws IOException {
+ boolean isNodeDown = decommissionedNodes.size() > 0;
+ // need a raw stream
+ assertTrue("Not HDFS:" + fileSys.getUri(),
+ fileSys instanceof DistributedFileSystem);
+ HdfsDataInputStream dis = (HdfsDataInputStream) fileSys.open(name);
+ Collection<LocatedBlock> dinfo = dis.getAllBlocks();
+ for (LocatedBlock blk : dinfo) { // for each block
+ int hasdown = 0;
+ DatanodeInfo[] nodes = blk.getLocations();
+ for (int j = 0; j < nodes.length; j++) { // for each replica
+ LOG.info("Block Locations size={}, locs={}, j=", nodes.length,
+ nodes[j].toString(), j);
+ boolean found = false;
+ for (DatanodeInfo datanodeInfo : decommissionedNodes) {
+ // check against decommissioned list
+ if (isNodeDown
+ && nodes[j].getXferAddr().equals(datanodeInfo.getXferAddr())) {
+ found = true;
+ hasdown++;
+ // Downnode must actually be decommissioned
+ if (!nodes[j].isDecommissioned()) {
+ return "For block " + blk.getBlock() + " replica on " + nodes[j]
+ + " is given as downnode, " + "but is not decommissioned";
+ }
+ // TODO: Add check to verify that the Decommissioned node (if any)
+ // should only be last node in list.
+ LOG.info("Block " + blk.getBlock() + " replica on " + nodes[j]
+ + " is decommissioned.");
+ }
+ }
+ // Non-downnodes must not be decommissioned
+ if (!found && nodes[j].isDecommissioned()) {
+ return "For block " + blk.getBlock() + " replica on " + nodes[j]
+ + " is unexpectedly decommissioned";
+ }
+ }
+
+ LOG.info("Block " + blk.getBlock() + " has " + hasdown
+ + " decommissioned replica.");
+ if (Math.min(numDatanodes, repl + hasdown) != nodes.length) {
+ return "Wrong number of replicas for block " + blk.getBlock() + ": "
+ + nodes.length + ", expected "
+ + Math.min(numDatanodes, repl + hasdown);
+ }
+ }
+ return null;
+ }
+}
\ No newline at end of file