You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by da...@apache.org on 2014/07/18 19:58:07 UTC
svn commit: r1611737 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/tes...
Author: daryn
Date: Fri Jul 18 17:58:07 2014
New Revision: 1611737
URL: http://svn.apache.org/r1611737
Log:
HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jul 18 17:58:07 2014
@@ -595,6 +595,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6583. Remove clientNode in FileUnderConstructionFeature. (wheat9)
+ HDFS-6599. 2.4 addBlock is 10 to 20 times slower compared to 0.23 (daryn)
+
BUG FIXES
HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Jul 18 17:58:07 2014
@@ -339,7 +339,7 @@ public class DatanodeInfo extends Datano
buffer.append("Cache Remaining: " +cr+ " ("+StringUtils.byteDesc(cr)+")"+"\n");
buffer.append("Cache Used%: "+percent2String(cacheUsedPercent) + "\n");
buffer.append("Cache Remaining%: "+percent2String(cacheRemainingPercent) + "\n");
-
+ buffer.append("Xceivers: "+getXceiverCount()+"\n");
buffer.append("Last contact: "+new Date(lastUpdate)+"\n");
return buffer.toString();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Fri Jul 18 17:58:07 2014
@@ -636,15 +636,11 @@ public class BlockPlacementPolicyDefault
// check the communication traffic of the target machine
if (considerLoad) {
- double avgLoad = 0;
- if (stats != null) {
- int size = stats.getNumDatanodesInService();
- if (size != 0) {
- avgLoad = (double)stats.getTotalLoad()/size;
- }
- }
- if (node.getXceiverCount() > (2.0 * avgLoad)) {
- logNodeIsNotChosen(storage, "the node is too busy ");
+ final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
+ final int nodeLoad = node.getXceiverCount();
+ if (nodeLoad > maxLoad) {
+ logNodeIsNotChosen(storage,
+ "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
return false;
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jul 18 17:58:07 2014
@@ -820,7 +820,9 @@ public class DatanodeManager {
}
/** Start decommissioning the specified datanode. */
- private void startDecommission(DatanodeDescriptor node) {
+ @InterfaceAudience.Private
+ @VisibleForTesting
+ public void startDecommission(DatanodeDescriptor node) {
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
LOG.info("Start Decommissioning " + node + " " + storage
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Fri Jul 18 17:58:07 2014
@@ -52,6 +52,12 @@ public interface DatanodeStatistics {
/** @return the xceiver count */
public int getXceiverCount();
+ /** @return average xceiver count for non-decommission(ing|ed) nodes */
+ public int getInServiceXceiverCount();
+
+ /** @return number of non-decommission(ing|ed) nodes */
+ public int getNumDatanodesInService();
+
/**
* @return the total used space by data nodes for non-DFS purposes
* such as storing temporary files on the local file system
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Jul 18 17:58:07 2014
@@ -151,6 +151,16 @@ class HeartbeatManager implements Datano
}
@Override
+ public synchronized int getInServiceXceiverCount() {
+ return stats.nodesInServiceXceiverCount;
+ }
+
+ @Override
+ public synchronized int getNumDatanodesInService() {
+ return stats.nodesInService;
+ }
+
+ @Override
public synchronized long getCacheCapacity() {
return stats.cacheCapacity;
}
@@ -178,7 +188,7 @@ class HeartbeatManager implements Datano
}
synchronized void register(final DatanodeDescriptor d) {
- if (!datanodes.contains(d)) {
+ if (!d.isAlive) {
addDatanode(d);
//update its timestamp
@@ -191,6 +201,8 @@ class HeartbeatManager implements Datano
}
synchronized void addDatanode(final DatanodeDescriptor d) {
+ // update in-service node count
+ stats.add(d);
datanodes.add(d);
d.isAlive = true;
}
@@ -323,6 +335,9 @@ class HeartbeatManager implements Datano
private long cacheCapacity = 0L;
private long cacheUsed = 0L;
+ private int nodesInService = 0;
+ private int nodesInServiceXceiverCount = 0;
+
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) {
@@ -330,6 +345,8 @@ class HeartbeatManager implements Datano
blockPoolUsed += node.getBlockPoolUsed();
xceiverCount += node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService++;
+ nodesInServiceXceiverCount += node.getXceiverCount();
capacityTotal += node.getCapacity();
capacityRemaining += node.getRemaining();
} else {
@@ -344,6 +361,8 @@ class HeartbeatManager implements Datano
blockPoolUsed -= node.getBlockPoolUsed();
xceiverCount -= node.getXceiverCount();
if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService--;
+ nodesInServiceXceiverCount -= node.getXceiverCount();
capacityTotal -= node.getCapacity();
capacityRemaining -= node.getRemaining();
} else {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java Fri Jul 18 17:58:07 2014
@@ -48,6 +48,15 @@ public interface FSClusterStats {
* @return Number of datanodes that are both alive and not decommissioned.
*/
public int getNumDatanodesInService();
+
+ /**
+ * an indication of the average load of non-decommission(ing|ed) nodes
+ * eligible for block placement
+ *
+ * @return average of the in service number of block transfers and block
+ * writes that are currently occurring on the cluster.
+ */
+ public double getInServiceXceiverAverage();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Jul 18 17:58:07 2014
@@ -7320,7 +7320,18 @@ public class FSNamesystem implements Nam
@Override // FSClusterStats
public int getNumDatanodesInService() {
- return getNumLiveDataNodes() - getNumDecomLiveDataNodes();
+ return datanodeStatistics.getNumDatanodesInService();
+ }
+
+ @Override // for block placement strategy
+ public double getInServiceXceiverAverage() {
+ double avgLoad = 0;
+ final int nodes = getNumDatanodesInService();
+ if (nodes != 0) {
+ final int xceivers = datanodeStatistics.getInServiceXceiverCount();
+ avgLoad = (double)xceivers/nodes;
+ }
+ return avgLoad;
}
public SnapshotManager getSnapshotManager() {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java Fri Jul 18 17:58:07 2014
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.test.PathUtils;
@@ -101,6 +102,7 @@ public class TestReplicationPolicyConsid
}
}
+ private final double EPSILON = 0.0001;
/**
* Tests that chooseTarget with considerLoad set to true correctly calculates
* load with decommissioned nodes.
@@ -109,14 +111,6 @@ public class TestReplicationPolicyConsid
public void testChooseTargetWithDecomNodes() throws IOException {
namenode.getNamesystem().writeLock();
try {
- // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
- // returns false
- for (int i = 0; i < 3; i++) {
- DatanodeInfo d = dnManager.getDatanodeByXferAddr(
- dnrList.get(i).getIpAddr(),
- dnrList.get(i).getXferPort());
- d.setDecommissioned();
- }
String blockPoolId = namenode.getNamesystem().getBlockPoolId();
dnManager.handleHeartbeat(dnrList.get(3),
BlockManagerTestUtil.getStorageReportsForDatanode(dataNodes[3]),
@@ -133,6 +127,20 @@ public class TestReplicationPolicyConsid
blockPoolId, dataNodes[5].getCacheCapacity(),
dataNodes[5].getCacheRemaining(),
4, 0, 0);
+ // value in the above heartbeats
+ final int load = 2 + 4 + 4;
+
+ FSNamesystem fsn = namenode.getNamesystem();
+ assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+
+ // Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
+ // returns false
+ for (int i = 0; i < 3; i++) {
+ DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
+ dnManager.startDecommission(d);
+ d.setDecommissioned();
+ }
+ assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
// Call chooseTarget()
DatanodeStorageInfo[] targets = namenode.getNamesystem().getBlockManager()
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java?rev=1611737&r1=1611736&r2=1611737&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java Fri Jul 18 17:58:07 2014
@@ -18,9 +18,11 @@
package org.apache.hadoop.hdfs.server.namenode;
-import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY;
+import static org.junit.Assert.*;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -28,12 +30,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.Test;
@@ -153,4 +164,177 @@ public class TestNamenodeCapacityReport
if (cluster != null) {cluster.shutdown();}
}
}
+
+ private static final float EPSILON = 0.0001f;
+ @Test
+ public void testXceiverCount() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ // don't waste time retrying if close fails
+ conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
+ MiniDFSCluster cluster = null;
+
+ final int nodes = 8;
+ final int fileCount = 5;
+ final short fileRepl = 3;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(nodes).build();
+ cluster.waitActive();
+
+ final FSNamesystem namesystem = cluster.getNamesystem();
+ final DatanodeManager dnm = namesystem.getBlockManager().getDatanodeManager();
+ List<DataNode> datanodes = cluster.getDataNodes();
+ final DistributedFileSystem fs = cluster.getFileSystem();
+
+ // trigger heartbeats in case not already sent
+ triggerHeartbeats(datanodes);
+
+ // check that all nodes are live and in service
+ int expectedTotalLoad = nodes; // xceiver server adds 1 to load
+ int expectedInServiceNodes = nodes;
+ int expectedInServiceLoad = nodes;
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+
+ // shutdown half the nodes and force a heartbeat check to ensure
+ // counts are accurate
+ for (int i=0; i < nodes/2; i++) {
+ DataNode dn = datanodes.get(i);
+ DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
+ dn.shutdown();
+ dnd.setLastUpdate(0L);
+ BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+ expectedInServiceNodes--;
+ assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ }
+
+ // restart the nodes to verify that counts are correct after
+ // node re-registration
+ cluster.restartDataNodes();
+ cluster.waitActive();
+ datanodes = cluster.getDataNodes();
+ expectedInServiceNodes = nodes;
+ assertEquals(nodes, datanodes.size());
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+
+ // create streams and hsync to force datastreamers to start
+ DFSOutputStream[] streams = new DFSOutputStream[fileCount];
+ for (int i=0; i < fileCount; i++) {
+ streams[i] = (DFSOutputStream)fs.create(new Path("/f"+i), fileRepl)
+ .getWrappedStream();
+ streams[i].write("1".getBytes());
+ streams[i].hsync();
+ // the load for writers is 2 because both the write xceiver & packet
+ // responder threads are counted in the load
+ expectedTotalLoad += 2*fileRepl;
+ expectedInServiceLoad += 2*fileRepl;
+ }
+ // force nodes to send load update
+ triggerHeartbeats(datanodes);
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes,
+ namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+
+ // decomm a few nodes, substract their load from the expected load,
+ // trigger heartbeat to force load update
+ for (int i=0; i < fileRepl; i++) {
+ expectedInServiceNodes--;
+ DatanodeDescriptor dnd =
+ dnm.getDatanode(datanodes.get(i).getDatanodeId());
+ expectedInServiceLoad -= dnd.getXceiverCount();
+ dnm.startDecommission(dnd);
+ DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
+ Thread.sleep(100);
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes,
+ namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+ }
+
+ // check expected load while closing each stream. recalc expected
+ // load based on whether the nodes in the pipeline are decomm
+ for (int i=0; i < fileCount; i++) {
+ int decomm = 0;
+ for (DatanodeInfo dni : streams[i].getPipeline()) {
+ DatanodeDescriptor dnd = dnm.getDatanode(dni);
+ expectedTotalLoad -= 2;
+ if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
+ decomm++;
+ } else {
+ expectedInServiceLoad -= 2;
+ }
+ }
+ try {
+ streams[i].close();
+ } catch (IOException ioe) {
+ // nodes will go decommissioned even if there's a UC block whose
+ // other locations are decommissioned too. we'll ignore that
+ // bug for now
+ if (decomm < fileRepl) {
+ throw ioe;
+ }
+ }
+ triggerHeartbeats(datanodes);
+ // verify node count and loads
+ assertEquals(nodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes,
+ namesystem.getNumDatanodesInService());
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
+ assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+ }
+
+ // shutdown each node, verify node counts based on decomm state
+ for (int i=0; i < nodes; i++) {
+ DataNode dn = datanodes.get(i);
+ dn.shutdown();
+ // force it to appear dead so live count decreases
+ DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
+ dnDesc.setLastUpdate(0L);
+ BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
+ assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
+ // first few nodes are already out of service
+ if (i >= fileRepl) {
+ expectedInServiceNodes--;
+ }
+ assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+
+ // live nodes always report load of 1. no nodes is load 0
+ double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
+ assertEquals((double)expectedXceiverAvg,
+ namesystem.getInServiceXceiverAverage(), EPSILON);
+ }
+
+ // final sanity check
+ assertEquals(0, namesystem.getNumLiveDataNodes());
+ assertEquals(0, namesystem.getNumDatanodesInService());
+ assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
+ assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void triggerHeartbeats(List<DataNode> datanodes)
+ throws IOException, InterruptedException {
+ for (DataNode dn : datanodes) {
+ DataNodeTestUtils.triggerHeartbeat(dn);
+ }
+ Thread.sleep(100);
+ }
}