You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/12 04:47:46 UTC
[24/25] hadoop git commit: HDFS-7375. Move FSClusterStats to
o.a.h.h.hdfs.server.blockmanagement. Contributed by Haohui Mai.
HDFS-7375. Move FSClusterStats to o.a.h.h.hdfs.server.blockmanagement. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/46f6f9d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/46f6f9d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/46f6f9d6
Branch: refs/heads/HDFS-EC
Commit: 46f6f9d60d0a2c1f441a0e81a071b08c24dbd6d6
Parents: 163bb55
Author: Haohui Mai <wh...@apache.org>
Authored: Tue Nov 11 18:22:40 2014 -0800
Committer: Haohui Mai <wh...@apache.org>
Committed: Tue Nov 11 18:22:40 2014 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../server/blockmanagement/BlockManager.java | 14 +--
.../blockmanagement/BlockPlacementPolicy.java | 1 -
.../BlockPlacementPolicyDefault.java | 1 -
.../BlockPlacementPolicyWithNodeGroup.java | 1 -
.../server/blockmanagement/DatanodeManager.java | 40 +++++-
.../server/blockmanagement/FSClusterStats.java | 60 +++++++++
.../hdfs/server/namenode/FSClusterStats.java | 62 ---------
.../hdfs/server/namenode/FSNamesystem.java | 28 +----
.../blockmanagement/TestBlockManager.java | 10 +-
.../blockmanagement/TestReplicationPolicy.java | 13 +-
.../TestReplicationPolicyConsiderLoad.java | 6 +-
.../server/namenode/TestNameNodeMXBean.java | 126 +++++++++++--------
.../namenode/TestNamenodeCapacityReport.java | 72 +++++------
14 files changed, 234 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 07762bf..ea89344 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -353,6 +353,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7381. Decouple the management of block id and gen stamps from
FSNamesystem. (wheat9)
+ HDFS-7375. Move FSClusterStats to o.a.h.h.hdfs.server.blockmanagement.
+ (wheat9)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5531400..b8dcd88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -66,7 +66,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
@@ -111,7 +110,7 @@ public class BlockManager {
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
private final BlockTokenSecretManager blockTokenSecretManager;
-
+
private final PendingDataNodeMessages pendingDNMessages =
new PendingDataNodeMessages();
@@ -264,9 +263,9 @@ public class BlockManager {
/** Check whether name system is running before terminating */
private boolean checkNSRunning = true;
-
- public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
- final Configuration conf) throws IOException {
+
+ public BlockManager(final Namesystem namesystem, final Configuration conf)
+ throws IOException {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
@@ -281,8 +280,9 @@ public class BlockManager {
blocksMap = new BlocksMap(
LightWeightGSet.computeCapacity(2.0, "BlocksMap"));
blockplacement = BlockPlacementPolicy.getInstance(
- conf, stats, datanodeManager.getNetworkTopology(),
- datanodeManager.getHost2DatanodeMap());
+ conf, datanodeManager.getFSClusterStats(),
+ datanodeManager.getNetworkTopology(),
+ datanodeManager.getHost2DatanodeMap());
storagePolicySuite = BlockStoragePolicySuite.createDefaultSuite();
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
index 26a55a2..caeb6ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
index 5b02384..30ab5a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
index 60e192b..19fcb14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NetworkTopologyWithNodeGroup;
import org.apache.hadoop.net.Node;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 3b03d1d..d19aad7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -66,6 +66,7 @@ public class DatanodeManager {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
+ private final FSClusterStats fsClusterStats;
private Daemon decommissionthread = null;
/**
@@ -169,7 +170,7 @@ public class DatanodeManager {
* directives that we've already sent.
*/
private final long timeBetweenResendingCachingDirectivesMs;
-
+
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -178,6 +179,7 @@ public class DatanodeManager {
networktopology = NetworkTopology.getInstance(conf);
this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
+ this.fsClusterStats = newFSClusterStats();
this.defaultXferPort = NetUtils.createSocketAddr(
conf.get(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY,
@@ -329,6 +331,11 @@ public class DatanodeManager {
return heartbeatManager;
}
+ @VisibleForTesting
+ public FSClusterStats getFSClusterStats() {
+ return fsClusterStats;
+ }
+
/** @return the datanode statistics. */
public DatanodeStatistics getDatanodeStatistics() {
return heartbeatManager;
@@ -1595,5 +1602,36 @@ public class DatanodeManager {
public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
this.shouldSendCachingCommands = shouldSendCachingCommands;
}
+
+ FSClusterStats newFSClusterStats() {
+ return new FSClusterStats() {
+ @Override
+ public int getTotalLoad() {
+ return heartbeatManager.getXceiverCount();
+ }
+
+ @Override
+ public boolean isAvoidingStaleDataNodesForWrite() {
+ return shouldAvoidStaleDataNodesForWrite();
+ }
+
+ @Override
+ public int getNumDatanodesInService() {
+ return heartbeatManager.getNumDatanodesInService();
+ }
+
+ @Override
+ public double getInServiceXceiverAverage() {
+ double avgLoad = 0;
+ final int nodes = getNumDatanodesInService();
+ if (nodes != 0) {
+ final int xceivers = heartbeatManager
+ .getInServiceXceiverCount();
+ avgLoad = (double)xceivers/nodes;
+ }
+ return avgLoad;
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
new file mode 100644
index 0000000..556b7fc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/FSClusterStats.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This interface is used for retrieving the load related statistics of
+ * the cluster.
+ */
+@InterfaceAudience.Private
+public interface FSClusterStats {
+
+ /**
+ * an indication of the total load of the cluster.
+ *
+ * @return a count of the total number of block transfers and block
+ * writes that are currently occuring on the cluster.
+ */
+ public int getTotalLoad();
+
+ /**
+ * Indicate whether or not the cluster is now avoiding
+ * to use stale DataNodes for writing.
+ *
+ * @return True if the cluster is currently avoiding using stale DataNodes
+ * for writing targets, and false otherwise.
+ */
+ public boolean isAvoidingStaleDataNodesForWrite();
+
+ /**
+ * Indicates number of datanodes that are in service.
+ * @return Number of datanodes that are both alive and not decommissioned.
+ */
+ public int getNumDatanodesInService();
+
+ /**
+ * an indication of the average load of non-decommission(ing|ed) nodes
+ * eligible for block placement
+ *
+ * @return average of the in service number of block transfers and block
+ * writes that are currently occurring on the cluster.
+ */
+ public double getInServiceXceiverAverage();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
deleted file mode 100644
index 1a859a7..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSClusterStats.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * This interface is used for retrieving the load related statistics of
- * the cluster.
- */
-@InterfaceAudience.Private
-public interface FSClusterStats {
-
- /**
- * an indication of the total load of the cluster.
- *
- * @return a count of the total number of block transfers and block
- * writes that are currently occuring on the cluster.
- */
- public int getTotalLoad();
-
- /**
- * Indicate whether or not the cluster is now avoiding
- * to use stale DataNodes for writing.
- *
- * @return True if the cluster is currently avoiding using stale DataNodes
- * for writing targets, and false otherwise.
- */
- public boolean isAvoidingStaleDataNodesForWrite();
-
- /**
- * Indicates number of datanodes that are in service.
- * @return Number of datanodes that are both alive and not decommissioned.
- */
- public int getNumDatanodesInService();
-
- /**
- * an indication of the average load of non-decommission(ing|ed) nodes
- * eligible for block placement
- *
- * @return average of the in service number of block transfers and block
- * writes that are currently occurring on the cluster.
- */
- public double getInServiceXceiverAverage();
-}
-
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index b086390..f1ea818 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -318,8 +318,8 @@ import com.google.common.collect.Lists;
*/
@InterfaceAudience.Private
@Metrics(context="dfs")
-public class FSNamesystem implements Namesystem, FSClusterStats,
- FSNamesystemMBean, NameNodeMXBean {
+public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ NameNodeMXBean {
public static final Log LOG = LogFactory.getLog(FSNamesystem.class);
private static final ThreadLocal<StringBuilder> auditBuffer =
@@ -765,7 +765,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
- this.blockManager = new BlockManager(this, this, conf);
+ this.blockManager = new BlockManager(this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.blockIdManager = new BlockIdManager(blockManager);
@@ -7818,28 +7818,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
this.nnResourceChecker = nnResourceChecker;
}
- @Override
- public boolean isAvoidingStaleDataNodesForWrite() {
- return this.blockManager.getDatanodeManager()
- .shouldAvoidStaleDataNodesForWrite();
- }
-
- @Override // FSClusterStats
- public int getNumDatanodesInService() {
- return datanodeStatistics.getNumDatanodesInService();
- }
-
- @Override // for block placement strategy
- public double getInServiceXceiverAverage() {
- double avgLoad = 0;
- final int nodes = getNumDatanodesInService();
- if (nodes != 0) {
- final int xceivers = datanodeStatistics.getInServiceXceiverCount();
- avgLoad = (double)xceivers/nodes;
- }
- return avgLoad;
- }
-
public SnapshotManager getSnapshotManager() {
return snapshotManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 14f2b59..3df890f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -81,19 +81,17 @@ public class TestBlockManager {
private static final int NUM_TEST_ITERS = 30;
private static final int BLOCK_SIZE = 64*1024;
-
- private Configuration conf;
+
private FSNamesystem fsn;
private BlockManager bm;
@Before
public void setupMockCluster() throws IOException {
- conf = new HdfsConfiguration();
- conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
- "need to set a dummy value here so it assumes a multi-rack cluster");
+ Configuration conf = new HdfsConfiguration();
+ conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, "need to set a dummy value here so it assumes a multi-rack cluster");
fsn = Mockito.mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock();
- bm = new BlockManager(fsn, fsn, conf);
+ bm = new BlockManager(fsn, conf);
final String[] racks = {
"/rackA",
"/rackA",
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index 4febd28..ce2328c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -1145,9 +1144,7 @@ public class TestReplicationPolicy {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
when(mockNS.hasWriteLock()).thenReturn(true);
- FSClusterStats mockStats = mock(FSClusterStats.class);
- BlockManager bm =
- new BlockManager(mockNS, mockStats, new HdfsConfiguration());
+ BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
@@ -1193,9 +1190,7 @@ public class TestReplicationPolicy {
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
- FSClusterStats mockStats = mock(FSClusterStats.class);
- BlockManager bm =
- new BlockManager(mockNS, mockStats, new HdfsConfiguration());
+ BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
@@ -1248,9 +1243,7 @@ public class TestReplicationPolicy {
throws IOException {
Namesystem mockNS = mock(Namesystem.class);
when(mockNS.isPopulatingReplQueues()).thenReturn(true);
- FSClusterStats mockStats = mock(FSClusterStats.class);
- BlockManager bm =
- new BlockManager(mockNS, mockStats, new HdfsConfiguration());
+ BlockManager bm = new BlockManager(mockNS, new HdfsConfiguration());
UnderReplicatedBlocks underReplicatedBlocks = bm.neededReplications;
Block block1 = new Block(random.nextLong());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
index 8a479c1..a1f3e38 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
@@ -130,7 +130,8 @@ public class TestReplicationPolicyConsiderLoad {
final int load = 2 + 4 + 4;
FSNamesystem fsn = namenode.getNamesystem();
- assertEquals((double)load/6, fsn.getInServiceXceiverAverage(), EPSILON);
+ assertEquals((double)load/6, dnManager.getFSClusterStats()
+ .getInServiceXceiverAverage(), EPSILON);
// Decommission DNs so BlockPlacementPolicyDefault.isGoodTarget()
// returns false
@@ -139,7 +140,8 @@ public class TestReplicationPolicyConsiderLoad {
dnManager.startDecommission(d);
d.setDecommissioned();
}
- assertEquals((double)load/3, fsn.getInServiceXceiverAverage(), EPSILON);
+ assertEquals((double)load/3, dnManager.getFSClusterStats()
+ .getInServiceXceiverAverage(), EPSILON);
// update references of writer DN to update the de-commissioned state
List<DatanodeDescriptor> liveNodes = new ArrayList<DatanodeDescriptor>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
index 4e07854..fa9dca1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeMXBean.java
@@ -17,18 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.lang.management.ManagementFactory;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-
+import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -43,6 +32,18 @@ import org.apache.hadoop.util.VersionInfo;
import org.junit.Test;
import org.mortbay.util.ajax.JSON;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.io.File;
+import java.lang.management.ManagementFactory;
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
/**
* Class for testing {@link NameNodeMXBean} implementation
*/
@@ -62,14 +63,11 @@ public class TestNameNodeMXBean {
public void testNameNodeMXBeanInfo() throws Exception {
Configuration conf = new Configuration();
conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
- NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
- conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
-
+ NativeIO.POSIX.getCacheManipulator().getMemlockLimit());
MiniDFSCluster cluster = null;
try {
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
FSNamesystem fsn = cluster.getNameNode().namesystem;
@@ -77,29 +75,6 @@ public class TestNameNodeMXBean {
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(
"Hadoop:service=NameNode,name=NameNodeInfo");
-
- // Define include file to generate deadNodes metrics
- FileSystem localFileSys = FileSystem.getLocal(conf);
- Path workingDir = localFileSys.getWorkingDirectory();
- Path dir = new Path(workingDir,
- "build/test/data/temp/TestNameNodeMXBean");
- Path includeFile = new Path(dir, "include");
- assertTrue(localFileSys.mkdirs(dir));
- StringBuilder includeHosts = new StringBuilder();
- for(DataNode dn : cluster.getDataNodes()) {
- includeHosts.append(dn.getDisplayName()).append("\n");
- }
- DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
- conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
- fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
-
- cluster.stopDataNode(0);
- while (fsn.getNumDatanodesInService() != 2) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {}
- }
-
// get attribute "ClusterId"
String clusterId = (String) mbs.getAttribute(mxbeanName, "ClusterId");
assertEquals(fsn.getClusterId(), clusterId);
@@ -127,8 +102,7 @@ public class TestNameNodeMXBean {
// get attribute percentremaining
Float percentremaining = (Float) (mbs.getAttribute(mxbeanName,
"PercentRemaining"));
- assertEquals(fsn.getPercentRemaining(), percentremaining
- .floatValue(), DELTA);
+ assertEquals(fsn.getPercentRemaining(), percentremaining, DELTA);
// get attribute Totalblocks
Long totalblocks = (Long) (mbs.getAttribute(mxbeanName, "TotalBlocks"));
assertEquals(fsn.getTotalBlocks(), totalblocks.longValue());
@@ -151,15 +125,6 @@ public class TestNameNodeMXBean {
String deadnodeinfo = (String) (mbs.getAttribute(mxbeanName,
"DeadNodes"));
assertEquals(fsn.getDeadNodes(), deadnodeinfo);
- Map<String, Map<String, Object>> deadNodes =
- (Map<String, Map<String, Object>>) JSON.parse(deadnodeinfo);
- assertTrue(deadNodes.size() > 0);
- for (Map<String, Object> deadNode : deadNodes.values()) {
- assertTrue(deadNode.containsKey("lastContact"));
- assertTrue(deadNode.containsKey("decommissioned"));
- assertTrue(deadNode.containsKey("xferaddr"));
- }
-
// get attribute NodeUsage
String nodeUsage = (String) (mbs.getAttribute(mxbeanName,
"NodeUsage"));
@@ -233,4 +198,63 @@ public class TestNameNodeMXBean {
}
}
}
+
+ @SuppressWarnings({ "unchecked" })
+ @Test
+ public void testLastContactTime() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 1);
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ cluster.waitActive();
+
+ FSNamesystem fsn = cluster.getNameNode().namesystem;
+
+ MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
+ ObjectName mxbeanName = new ObjectName(
+ "Hadoop:service=NameNode,name=NameNodeInfo");
+
+ // Define include file to generate deadNodes metrics
+ FileSystem localFileSys = FileSystem.getLocal(conf);
+ Path workingDir = localFileSys.getWorkingDirectory();
+ Path dir = new Path(workingDir,
+ "build/test/data/temp/TestNameNodeMXBean");
+ Path includeFile = new Path(dir, "include");
+ assertTrue(localFileSys.mkdirs(dir));
+ StringBuilder includeHosts = new StringBuilder();
+ for(DataNode dn : cluster.getDataNodes()) {
+ includeHosts.append(dn.getDisplayName()).append("\n");
+ }
+ DFSTestUtil.writeFile(localFileSys, includeFile, includeHosts.toString());
+ conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
+ fsn.getBlockManager().getDatanodeManager().refreshNodes(conf);
+
+ cluster.stopDataNode(0);
+ while (fsn.getBlockManager().getDatanodeManager().getNumLiveDataNodes()
+ != 2 ) {
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+
+ // get attribute deadnodeinfo
+ String deadnodeinfo = (String) (mbs.getAttribute(mxbeanName,
+ "DeadNodes"));
+ assertEquals(fsn.getDeadNodes(), deadnodeinfo);
+ Map<String, Map<String, Object>> deadNodes =
+ (Map<String, Map<String, Object>>) JSON.parse(deadnodeinfo);
+ assertTrue(deadNodes.size() > 0);
+ for (Map<String, Object> deadNode : deadNodes.values()) {
+ assertTrue(deadNode.containsKey("lastContact"));
+ assertTrue(deadNode.containsKey("decommissioned"));
+ assertTrue(deadNode.containsKey("xferaddr"));
+ }
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/46f6f9d6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
index 15cad04..426563b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
@@ -193,11 +193,7 @@ public class TestNamenodeCapacityReport {
int expectedTotalLoad = nodes; // xceiver server adds 1 to load
int expectedInServiceNodes = nodes;
int expectedInServiceLoad = nodes;
- assertEquals(nodes, namesystem.getNumLiveDataNodes());
- assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
- assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
- assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
- namesystem.getInServiceXceiverAverage(), EPSILON);
+ checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
// shutdown half the nodes and force a heartbeat check to ensure
// counts are accurate
@@ -209,7 +205,7 @@ public class TestNamenodeCapacityReport {
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
expectedInServiceNodes--;
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
- assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
}
// restart the nodes to verify that counts are correct after
@@ -219,11 +215,7 @@ public class TestNamenodeCapacityReport {
datanodes = cluster.getDataNodes();
expectedInServiceNodes = nodes;
assertEquals(nodes, datanodes.size());
- assertEquals(nodes, namesystem.getNumLiveDataNodes());
- assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
- assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
- assertEquals((double)expectedInServiceLoad/expectedInServiceLoad,
- namesystem.getInServiceXceiverAverage(), EPSILON);
+ checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
// create streams and hsync to force datastreamers to start
DFSOutputStream[] streams = new DFSOutputStream[fileCount];
@@ -239,12 +231,7 @@ public class TestNamenodeCapacityReport {
}
// force nodes to send load update
triggerHeartbeats(datanodes);
- assertEquals(nodes, namesystem.getNumLiveDataNodes());
- assertEquals(expectedInServiceNodes,
- namesystem.getNumDatanodesInService());
- assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
- assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
- namesystem.getInServiceXceiverAverage(), EPSILON);
+ checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
// decomm a few nodes, substract their load from the expected load,
// trigger heartbeat to force load update
@@ -256,12 +243,7 @@ public class TestNamenodeCapacityReport {
dnm.startDecommission(dnd);
DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
Thread.sleep(100);
- assertEquals(nodes, namesystem.getNumLiveDataNodes());
- assertEquals(expectedInServiceNodes,
- namesystem.getNumDatanodesInService());
- assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
- assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
- namesystem.getInServiceXceiverAverage(), EPSILON);
+ checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
}
// check expected load while closing each stream. recalc expected
@@ -289,12 +271,7 @@ public class TestNamenodeCapacityReport {
}
triggerHeartbeats(datanodes);
// verify node count and loads
- assertEquals(nodes, namesystem.getNumLiveDataNodes());
- assertEquals(expectedInServiceNodes,
- namesystem.getNumDatanodesInService());
- assertEquals(expectedTotalLoad, namesystem.getTotalLoad());
- assertEquals((double)expectedInServiceLoad/expectedInServiceNodes,
- namesystem.getInServiceXceiverAverage(), EPSILON);
+ checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
}
// shutdown each node, verify node counts based on decomm state
@@ -310,26 +287,49 @@ public class TestNamenodeCapacityReport {
if (i >= fileRepl) {
expectedInServiceNodes--;
}
- assertEquals(expectedInServiceNodes, namesystem.getNumDatanodesInService());
+ assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
// live nodes always report load of 1. no nodes is load 0
double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
assertEquals((double)expectedXceiverAvg,
- namesystem.getInServiceXceiverAverage(), EPSILON);
+ getInServiceXceiverAverage(namesystem), EPSILON);
}
// final sanity check
- assertEquals(0, namesystem.getNumLiveDataNodes());
- assertEquals(0, namesystem.getNumDatanodesInService());
- assertEquals(0.0, namesystem.getTotalLoad(), EPSILON);
- assertEquals(0.0, namesystem.getInServiceXceiverAverage(), EPSILON);
+ checkClusterHealth(0, namesystem, 0.0, 0, 0.0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
-
+
+ private static void checkClusterHealth(
+ int numOfLiveNodes,
+ FSNamesystem namesystem, double expectedTotalLoad,
+ int expectedInServiceNodes, double expectedInServiceLoad) {
+
+ assertEquals(numOfLiveNodes, namesystem.getNumLiveDataNodes());
+ assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
+ assertEquals(expectedTotalLoad, namesystem.getTotalLoad(), EPSILON);
+ if (expectedInServiceNodes != 0) {
+ assertEquals(expectedInServiceLoad / expectedInServiceNodes,
+ getInServiceXceiverAverage(namesystem), EPSILON);
+ } else {
+ assertEquals(0.0, getInServiceXceiverAverage(namesystem), EPSILON);
+ }
+ }
+
+ private static int getNumDNInService(FSNamesystem fsn) {
+ return fsn.getBlockManager().getDatanodeManager().getFSClusterStats()
+ .getNumDatanodesInService();
+ }
+
+ private static double getInServiceXceiverAverage(FSNamesystem fsn) {
+ return fsn.getBlockManager().getDatanodeManager().getFSClusterStats()
+ .getInServiceXceiverAverage();
+ }
+
private void triggerHeartbeats(List<DataNode> datanodes)
throws IOException, InterruptedException {
for (DataNode dn : datanodes) {