You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/06/30 21:01:23 UTC
[10/27] hadoop git commit: HDFS-7390. Provide JMX metrics per storage
type. (Benoy Antony)
HDFS-7390. Provide JMX metrics per storage type. (Benoy Antony)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3fed8e6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3fed8e6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3fed8e6
Branch: refs/heads/HADOOP-12111
Commit: d3fed8e653ed9e18d3a29a11c4b24a628ac770bb
Parents: fde20ff
Author: Benoy Antony <be...@apache.org>
Authored: Mon Jun 29 11:00:22 2015 -0700
Committer: Benoy Antony <be...@apache.org>
Committed: Mon Jun 29 11:00:22 2015 -0700
----------------------------------------------------------------------
.../server/blockmanagement/BlockManager.java | 15 +-
.../blockmanagement/BlockStatsMXBean.java | 36 +++++
.../blockmanagement/DatanodeStatistics.java | 6 +
.../blockmanagement/HeartbeatManager.java | 96 +++++++++++-
.../blockmanagement/StorageTypeStats.java | 115 +++++++++++++++
.../blockmanagement/TestBlockStatsMXBean.java | 146 +++++++++++++++++++
6 files changed, 412 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fed8e6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5bd4980..0b60a97 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -39,6 +39,8 @@ import java.util.TreeSet;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
+import javax.management.ObjectName;
+
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -85,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Daemon;
@@ -94,6 +97,7 @@ import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +105,7 @@ import org.slf4j.LoggerFactory;
* Keeps information related to the blocks stored in the Hadoop cluster.
*/
@InterfaceAudience.Private
-public class BlockManager {
+public class BlockManager implements BlockStatsMXBean {
public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class);
public static final Logger blockLog = NameNode.blockStateChangeLog;
@@ -129,6 +133,7 @@ public class BlockManager {
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
private final BlockReportLeaseManager blockReportLeaseManager;
+ private ObjectName mxBeanName;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@@ -468,6 +473,7 @@ public class BlockManager {
pendingReplications.start();
datanodeManager.activate(conf);
this.replicationThread.start();
+ mxBeanName = MBeans.register("NameNode", "BlockStats", this);
}
public void close() {
@@ -3944,6 +3950,8 @@ public class BlockManager {
public void shutdown() {
stopReplicationInitializer();
blocksMap.close();
+ MBeans.unregister(mxBeanName);
+ mxBeanName = null;
}
public void clear() {
@@ -3954,4 +3962,9 @@ public class BlockManager {
public BlockReportLeaseManager getBlockReportLeaseManager() {
return blockReportLeaseManager;
}
+
+ @Override // BlockStatsMXBean
+ public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
+ return datanodeManager.getDatanodeStatistics().getStorageTypeStats();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fed8e6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java
new file mode 100644
index 0000000..f22c537
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Map;
+
+import org.apache.hadoop.fs.StorageType;
+
+/**
+ * This is an interface used to retrieve statistic information related to
+ * block management.
+ */
+public interface BlockStatsMXBean {
+
+ /**
+ * The statistics of storage types.
+ *
+ * @return get storage statistics per storage type
+ */
+ Map<StorageType, StorageTypeStats> getStorageTypeStats();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fed8e6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
index c9bc3e5..33eca2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
+import java.util.Map;
+
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
/** Datanode statistics */
@@ -71,4 +74,7 @@ public interface DatanodeStatistics {
/** @return the expired heartbeats */
public int getExpiredHeartbeats();
+
+ /** @return Storage Tier statistics*/
+ Map<StorageType, StorageTypeStats> getStorageTypeStats();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fed8e6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
index 9017fe1..cc9365d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
@@ -18,9 +18,15 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -189,6 +195,11 @@ class HeartbeatManager implements DatanodeStatistics {
return stats.expiredHeartbeats;
}
+ @Override
+ public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
+ return stats.statsMap.get();
+ }
+
synchronized void register(final DatanodeDescriptor d) {
if (!d.isAlive) {
addDatanode(d);
@@ -393,6 +404,9 @@ class HeartbeatManager implements DatanodeStatistics {
* For decommissioning/decommissioned nodes, only used capacity is counted.
*/
private static class Stats {
+
+ private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
+
private long capacityTotal = 0L;
private long capacityUsed = 0L;
private long capacityRemaining = 0L;
@@ -420,6 +434,14 @@ class HeartbeatManager implements DatanodeStatistics {
}
cacheCapacity += node.getCacheCapacity();
cacheUsed += node.getCacheUsed();
+ Set<StorageType> storageTypes = new HashSet<>();
+ for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
+ statsMap.addStorage(storageInfo, node);
+ storageTypes.add(storageInfo.getStorageType());
+ }
+ for (StorageType storageType : storageTypes) {
+ statsMap.addNode(storageType, node);
+ }
}
private void subtract(final DatanodeDescriptor node) {
@@ -436,6 +458,14 @@ class HeartbeatManager implements DatanodeStatistics {
}
cacheCapacity -= node.getCacheCapacity();
cacheUsed -= node.getCacheUsed();
+ Set<StorageType> storageTypes = new HashSet<>();
+ for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
+ statsMap.subtractStorage(storageInfo, node);
+ storageTypes.add(storageInfo.getStorageType());
+ }
+ for (StorageType storageType : storageTypes) {
+ statsMap.subtractNode(storageType, node);
+ }
}
/** Increment expired heartbeat counter. */
@@ -443,5 +473,69 @@ class HeartbeatManager implements DatanodeStatistics {
expiredHeartbeats++;
}
}
-}
+ /** StorageType specific statistics.
+ * For decommissioning/decommissioned nodes, only used capacity is counted.
+ */
+
+ static final class StorageTypeStatsMap {
+
+ private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
+ new IdentityHashMap<>();
+
+ private StorageTypeStatsMap() {}
+
+ private StorageTypeStatsMap(StorageTypeStatsMap other) {
+ storageTypeStatsMap =
+ new IdentityHashMap<>(other.storageTypeStatsMap);
+ for (Map.Entry<StorageType, StorageTypeStats> entry :
+ storageTypeStatsMap.entrySet()) {
+ entry.setValue(new StorageTypeStats(entry.getValue()));
+ }
+ }
+
+ private Map<StorageType, StorageTypeStats> get() {
+ return Collections.unmodifiableMap(storageTypeStatsMap);
+ }
+
+ private void addNode(StorageType storageType,
+ final DatanodeDescriptor node) {
+ StorageTypeStats storageTypeStats =
+ storageTypeStatsMap.get(storageType);
+ if (storageTypeStats == null) {
+ storageTypeStats = new StorageTypeStats();
+ storageTypeStatsMap.put(storageType, storageTypeStats);
+ }
+ storageTypeStats.addNode(node);
+ }
+
+ private void addStorage(final DatanodeStorageInfo info,
+ final DatanodeDescriptor node) {
+ StorageTypeStats storageTypeStats =
+ storageTypeStatsMap.get(info.getStorageType());
+ if (storageTypeStats == null) {
+ storageTypeStats = new StorageTypeStats();
+ storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
+ }
+ storageTypeStats.addStorage(info, node);
+ }
+
+ private void subtractStorage(final DatanodeStorageInfo info,
+ final DatanodeDescriptor node) {
+ StorageTypeStats storageTypeStats =
+ storageTypeStatsMap.get(info.getStorageType());
+ if (storageTypeStats != null) {
+ storageTypeStats.subtractStorage(info, node);
+ }
+ }
+
+ private void subtractNode(StorageType storageType,
+ final DatanodeDescriptor node) {
+ StorageTypeStats storageTypeStats =
+ storageTypeStatsMap.get(storageType);
+ if (storageTypeStats != null) {
+ storageTypeStats.subtractNode(node);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fed8e6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
new file mode 100644
index 0000000..45dcc8d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.beans.ConstructorProperties;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Statistics per StorageType.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class StorageTypeStats {
+ private long capacityTotal = 0L;
+ private long capacityUsed = 0L;
+ private long capacityRemaining = 0L;
+ private long blockPoolUsed = 0L;
+ private int nodesInService = 0;
+
+ @ConstructorProperties({"capacityTotal",
+ "capacityUsed", "capacityRemaining", "blockPoolUsed", "nodesInService"})
+ public StorageTypeStats(long capacityTotal, long capacityUsed,
+ long capacityRemaining, long blockPoolUsed, int nodesInService) {
+ this.capacityTotal = capacityTotal;
+ this.capacityUsed = capacityUsed;
+ this.capacityRemaining = capacityRemaining;
+ this.blockPoolUsed = blockPoolUsed;
+ this.nodesInService = nodesInService;
+ }
+
+ public long getCapacityTotal() {
+ return capacityTotal;
+ }
+
+ public long getCapacityUsed() {
+ return capacityUsed;
+ }
+
+ public long getCapacityRemaining() {
+ return capacityRemaining;
+ }
+
+ public long getBlockPoolUsed() {
+ return blockPoolUsed;
+ }
+
+ public int getNodesInService() {
+ return nodesInService;
+ }
+
+ StorageTypeStats() {}
+
+ StorageTypeStats(StorageTypeStats other) {
+ capacityTotal = other.capacityTotal;
+ capacityUsed = other.capacityUsed;
+ capacityRemaining = other.capacityRemaining;
+ blockPoolUsed = other.blockPoolUsed;
+ nodesInService = other.nodesInService;
+ }
+
+ void addStorage(final DatanodeStorageInfo info,
+ final DatanodeDescriptor node) {
+ capacityUsed += info.getDfsUsed();
+ blockPoolUsed += info.getBlockPoolUsed();
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ capacityTotal += info.getCapacity();
+ capacityRemaining += info.getRemaining();
+ } else {
+ capacityTotal += info.getDfsUsed();
+ }
+ }
+
+ void addNode(final DatanodeDescriptor node) {
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService++;
+ }
+ }
+
+ void subtractStorage(final DatanodeStorageInfo info,
+ final DatanodeDescriptor node) {
+ capacityUsed -= info.getDfsUsed();
+ blockPoolUsed -= info.getBlockPoolUsed();
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ capacityTotal -= info.getCapacity();
+ capacityRemaining -= info.getRemaining();
+ } else {
+ capacityTotal -= info.getDfsUsed();
+ }
+ }
+
+ void subtractNode(final DatanodeDescriptor node) {
+ if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+ nodesInService--;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3fed8e6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
new file mode 100644
index 0000000..43d983d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.util.ajax.JSON;
+
+/**
+ * Class for testing {@link BlockStatsMXBean} implementation
+ */
+public class TestBlockStatsMXBean {
+
+ private MiniDFSCluster cluster;
+
+ @Before
+ public void setup() throws IOException {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ cluster = null;
+ StorageType[][] types = new StorageType[6][];
+ for (int i=0; i<3; i++) {
+ types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.DISK};
+ }
+ for (int i=3; i< 5; i++) {
+ types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE};
+ }
+ types[5] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE,
+ StorageType.ARCHIVE};
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6).
+ storageTypes(types).storagesPerDatanode(3).build();
+ cluster.waitActive();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testStorageTypeStats() throws Exception {
+ Map<StorageType, StorageTypeStats> storageTypeStatsMap =
+ cluster.getNamesystem().getBlockManager().getStorageTypeStats();
+ assertTrue(storageTypeStatsMap.containsKey(StorageType.RAM_DISK));
+ assertTrue(storageTypeStatsMap.containsKey(StorageType.DISK));
+ assertTrue(storageTypeStatsMap.containsKey(StorageType.ARCHIVE));
+
+ StorageTypeStats storageTypeStats =
+ storageTypeStatsMap.get(StorageType.RAM_DISK);
+ assertEquals(6, storageTypeStats.getNodesInService());
+
+ storageTypeStats = storageTypeStatsMap.get(StorageType.DISK);
+ assertEquals(3, storageTypeStats.getNodesInService());
+
+ storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE);
+ assertEquals(3, storageTypeStats.getNodesInService());
+ }
+
+ protected static String readOutput(URL url) throws IOException {
+ StringBuilder out = new StringBuilder();
+ InputStream in = url.openConnection().getInputStream();
+ byte[] buffer = new byte[64 * 1024];
+ int len = in.read(buffer);
+ while (len > 0) {
+ out.append(new String(buffer, 0, len));
+ len = in.read(buffer);
+ }
+ return out.toString();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testStorageTypeStatsJMX() throws Exception {
+ URL baseUrl = new URL (cluster.getHttpUri(0));
+ String result = readOutput(new URL(baseUrl, "/jmx"));
+ System.out.println(result);
+
+ Map<String, Object> stat = (Map<String, Object>) JSON.parse(result);
+ Object[] beans =(Object[]) stat.get("beans");
+ Map<String, Object> blockStats = null;
+ for (Object bean : beans) {
+ Map<String, Object> map = (Map<String, Object>) bean;
+ if (map.get("name").equals("Hadoop:service=NameNode,name=BlockStats")) {
+ blockStats = map;
+ }
+ }
+ assertNotNull(blockStats);
+ Object[] storageTypeStatsList =
+ (Object[])blockStats.get("StorageTypeStats");
+ assertNotNull(storageTypeStatsList);
+ assertEquals (3, storageTypeStatsList.length);
+
+ Set<String> typesPresent = new HashSet<> ();
+ for (Object obj : storageTypeStatsList) {
+ Map<String, Object> entry = (Map<String, Object>)obj;
+ String storageType = (String)entry.get("key");
+ Map<String,Object> storageTypeStats = (Map<String,Object>)entry.get("value");
+ typesPresent.add(storageType);
+ if (storageType.equals("ARCHIVE") || storageType.equals("DISK") ) {
+ assertEquals(3l, storageTypeStats.get("nodesInService"));
+ } else if (storageType.equals("RAM_DISK")) {
+ assertEquals(6l, storageTypeStats.get("nodesInService"));
+ }
+ else {
+ fail();
+ }
+ }
+
+ assertTrue(typesPresent.contains("ARCHIVE"));
+ assertTrue(typesPresent.contains("DISK"));
+ assertTrue(typesPresent.contains("RAM_DISK"));
+ }
+}