You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:51:51 UTC
svn commit: r1077754 -
/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
Author: omalley
Date: Fri Mar 4 04:51:51 2011
New Revision: 1077754
URL: http://svn.apache.org/viewvc?rev=1077754&view=rev
Log:
commit f98ac7cfb2224293e5bfe2bde470c8dafe7c13ec
Author: Alfred Thompson <at...@yahoo-inc.com>
Date: Thu Sep 30 22:00:30 2010 +0000
This revision of the TestBalancer test case fixes issues with JMX integration.
In this rev, the basic balancer scenario is complete in its implementation.
This version provides a foundation for the development of further balancer
test scenarios.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java?rev=1077754&r1=1077753&r2=1077754&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java Fri Mar 4 04:51:51 2011
@@ -28,16 +28,15 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URI;
import java.security.SecureRandom;
-import java.util.HashMap;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.management.InstanceNotFoundException;
-import javax.management.MBeanException;
import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
-import javax.management.ReflectionException;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
@@ -56,18 +55,12 @@ import org.apache.hadoop.hdfs.test.syste
import org.apache.hadoop.mapreduce.test.system.MRCluster;
-import org.apache.hadoop.examples.RandomWriter;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.test.system.JTProtocol;
-import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.JTClient;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
import org.junit.Assert;
@@ -82,20 +75,6 @@ public class TestBalancer {
private Configuration hadoopConf;
private HDFSCluster dfsCluster;
private MRCluster mrCluster;
- // TODO don't hardwire these, introspect the cluster
- private static final String NAMENODE = "gsbl90277.blue.ygrid.yahoo.com";
- private static final String[] ENDPOINT_JMX = {
- "gsbl90277.blue.ygrid.yahoo.com-8008",
- "gsbl90276.blue.ygrid.yahoo.com-24812",
- "gsbl90275.blue.ygrid.yahoo.com-24810",
- "gsbl90274.blue.ygrid.yahoo.com-24808",
- "gsbl90273.blue.ygrid.yahoo.com-24806",
- "gsbl90272.blue.ygrid.yahoo.com-24804",
- "gsbl90271.blue.ygrid.yahoo.com-24802",
- "gsbl90270.blue.ygrid.yahoo.com-24800"
- };
- private Map<String, MBeanServerConnection> endpointMap =
- new HashMap<String, MBeanServerConnection>();
public TestBalancer() throws Exception {
}
@@ -108,7 +87,7 @@ public class TestBalancer {
//TODO no need for mr cluster anymore
mrCluster = MRCluster.createCluster(hadoopConf);
mrCluster.setUp();
- connectJMX();
+ //connectJMX();
}
@After
@@ -117,148 +96,7 @@ public class TestBalancer {
mrCluster.tearDown();
}
- /** Connect to JMX agents on HDFS cluster nodes */
- private void connectJMX() {
- final int HOST = 0;
- final int PORT = 1;
- for (String endpoint : ENDPOINT_JMX) {
- String[] toks = endpoint.split("-");
- String host = toks[HOST];
- String port = toks[PORT];
- LOG.info("HOST=" + host + ", PORT=" + port);
- MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port);
- endpointMap.put(host, jmxEndpoint);
- }
- }
-
- private long getDataNodeFreeSpace(DNClient datanode) {
- String dnHost = datanode.getHostName();
- Object volObj = getDNAttribute(dnHost, "VolumeInfo");
- Map volInfoMap = (Map) JSON.parse(volObj.toString());
- long totalFreeSpace = 0L;
- for (Object key : volInfoMap.keySet()) {
- Map attrMap = (Map) volInfoMap.get(key);
- long freeSpace = (Long) attrMap.get("freeSpace");
- //LOG.info( String.format("volume %s has %d bytes free space left", key, freeSpace) );
- totalFreeSpace += freeSpace;
- }
- //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj));
- return totalFreeSpace;
- }
-
- private long getDataNodeUsedSpace(DNClient datanode) {
- String dnHost = datanode.getHostName();
- LOG.debug("checking DFS space used on host "+dnHost);
- Object volObj = getDNAttribute(dnHost, "VolumeInfo");
- LOG.debug("retrieved volume info object "+volObj);
- Map volInfoMap = (Map) JSON.parse(volObj.toString());
- long totalUsedSpace = 0L;
- for (Object key : volInfoMap.keySet()) {
- Map attrMap = (Map) volInfoMap.get(key);
- // TODO should we be using free space here?
- long usedSpace = (Long) attrMap.get("usedSpace");
- LOG.info( String.format("volume %s has %d bytes used space", key, usedSpace) );
- totalUsedSpace += usedSpace;
- }
- //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj));
- return totalUsedSpace;
- }
-
- // TODO just throw the dang exceptions
- private Object getDNAttribute(String host, String attribName) {
- ObjectName name = null;
- Object attribVal = null;
- try {
- MBeanServerConnection conn = endpointMap.get(host);
- name = new ObjectName("HadoopInfo:type=DataNodeInfo");
- attribVal = conn.getAttribute(name, attribName);
- } catch (javax.management.AttributeNotFoundException attribNotFoundExc) {
- LOG.warn(String.format("no attribute matching %s found", attribName),
- attribNotFoundExc);
- } catch (javax.management.MalformedObjectNameException badObjNameExc) {
- LOG.warn("bad object name: " + name, badObjNameExc);
- } catch (javax.management.InstanceNotFoundException instNotFoundExc) {
- LOG.warn("no MBean instance found", instNotFoundExc);
- } catch (javax.management.ReflectionException reflectExc) {
- LOG.warn("reflection error!", reflectExc);
- } catch (javax.management.MBeanException mBeanExc) {
- LOG.warn("MBean error!", mBeanExc);
- } catch (java.io.IOException ioExc) {
- LOG.debug("i/o error!", ioExc);
- }
- return attribVal;
- }
- //@Test
-
- public void testJMXRemote() {
- final int HOST = 0;
- final int PORT = 1;
- for (String endpoint : ENDPOINT_JMX) {
- String[] toks = endpoint.split("-");
- String host = toks[HOST];
- String port = toks[PORT];
- //LOG.info("HOST="+host+", PORT="+port);
- MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port);
- endpointMap.put(host, jmxEndpoint);
- }
-
-
- Iterator<String> iter = endpointMap.keySet().iterator();
- while (iter.hasNext()) {
- String host = iter.next();
- MBeanServerConnection conn = endpointMap.get(host);
- ObjectName mBeanName = null;
- try {
- if (NAMENODE.equals(host)) {
- // TODO make this a constant
- mBeanName = new ObjectName("HadoopInfo:type=NameNodeInfo");
- } else {
- mBeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
- }
- Object versionObj = conn.getAttribute(mBeanName, "Version");
- LOG.info("host [" + host + "] runs version " + versionObj);
- } catch (javax.management.AttributeNotFoundException attribNotFoundExc) {
- // TODO don't hard-wire attrib name
- LOG.warn("no attribute matching `Version' found", attribNotFoundExc);
- } catch (javax.management.MalformedObjectNameException badObjNameExc) {
- LOG.warn("bad object name: " + mBeanName, badObjNameExc);
- } catch (javax.management.InstanceNotFoundException instNotFoundExc) {
- LOG.warn("no MBean instance found", instNotFoundExc);
- } catch (javax.management.ReflectionException reflectExc) {
- LOG.warn("reflection error!", reflectExc);
- } catch (javax.management.MBeanException mBeanExc) {
- LOG.warn("MBean error!", mBeanExc);
- } catch (java.io.IOException ioExc) {
- LOG.debug("i/o error!", ioExc);
- }
- }
- }
-
- private MBeanServerConnection getJMXEndpoint(String host, String port) {
- MBeanServerConnection conn = null;
- String urlPattern = null;
- try {
- urlPattern =
- "service:jmx:rmi:///jndi/rmi://"
- + host + ":"
- + port
- + "/jmxrmi";
- JMXServiceURL url = new JMXServiceURL(urlPattern);
- JMXConnector connector = JMXConnectorFactory.connect(url);
- conn = connector.getMBeanServerConnection();
- } catch (java.net.MalformedURLException badURLExc) {
- LOG.debug("bad url: " + urlPattern, badURLExc);
- } catch (java.io.IOException ioExc) {
- LOG.debug("i/o error!", ioExc);
- }
- return conn;
- }
- /* debug--
- public void testHello() {
- LOG.info("hello!");
- }*/
-
- //@Test
+ // Trivial @Test
public void testNameNodePing() throws IOException {
LOG.info("testing filesystem ping");
NNClient namenode = dfsCluster.getNNClient();
@@ -266,7 +104,7 @@ public class TestBalancer {
LOG.info("done.");
}
- //@Test
+ // Trivial @Test
public void testNameNodeConnectDisconnect() throws IOException {
LOG.info("connecting to namenode");
NNClient namenode = dfsCluster.getNNClient();
@@ -294,7 +132,7 @@ public class TestBalancer {
List<DNClient> testDNList = null;
Path balancerTempDir = null;
try {
- DNClient[] datanodes = getReserveDNs();
+ DNClient[] datanodes = getReserveDataNodes();
DNClient datanode1 = datanodes[0];
DNClient datanode2 = datanodes[1];
@@ -304,9 +142,8 @@ public class TestBalancer {
while (iter.hasNext()) {
try {
DNClient dn = iter.next();
- // TODO kill doesn't work anymore
- // TODO do a ssh to admin gateway and sudo yinst with command text do down a specific datanode
- stopDN( dn );
+ // kill doesn't work with secure-HDFS, so using our stopDataNode() method
+ stopDataNode( dn );
i++;
} catch (Exception e) {
LOG.info("error shutting down node " + i + ": " + e);
@@ -315,37 +152,55 @@ public class TestBalancer {
LOG.info("attempting to kill both test nodes");
// TODO add check to make sure there is enough capacity on these nodes to run test
- stopDN(datanode1);
- stopDN(datanode2);
+ stopDataNode(datanode1);
+ stopDataNode(datanode2);
LOG.info("starting up datanode ["+
datanode1.getHostName()+
"] and loading it with data");
- startDN(datanode1);
-
- LOG.info("datanode " + datanode1.getHostName()
- + " contains " + getDataNodeUsedSpace(datanode1) + " bytes");
+ startDataNode(datanode1);
+ // TODO make an appropriate JMXListener interface
+ JMXListenerBean lsnr1 = JMXListenerBean.listenForDataNodeInfo(datanode1);
+
// mkdir balancer-temp
balancerTempDir = makeTempDir();
// TODO write 2 blocks to file system
LOG.info("generating filesystem load");
- generateFSLoad(2); // generate 2 blocks of test data
+ // TODO spec blocks to generate by blockCount, blockSize, # of writers
+ generateFileSystemLoad(2); // generate 2 blocks of test data
LOG.info("measure space used on 1st node");
- long usedSpace0 = getDataNodeUsedSpace(datanode1);
+ long usedSpace0 = lsnr1.getDataNodeUsedSpace();
LOG.info("datanode " + datanode1.getHostName()
+ " contains " + usedSpace0 + " bytes");
LOG.info("bring up a 2nd node and run balancer on DFS");
- startDN(datanode2);
+ startDataNode(datanode2);
runBalancer();
+ JMXListenerBean lsnr2 = JMXListenerBean.listenForDataNodeInfo(datanode2);
LOG.info("measure blocks and files on both nodes, assert these "
+ "counts are identical pre- and post-balancer run");
- long usedSpace1 = getDataNodeUsedSpace(datanode1);
- long usedSpace2 = getDataNodeUsedSpace(datanode2);
- Assert.assertEquals(usedSpace0, usedSpace1 + usedSpace2);
-
+ long usedSpace1 = lsnr1.getDataNodeUsedSpace();
+ long usedSpace2 = lsnr2.getDataNodeUsedSpace();
+ long observedValue = usedSpace1 + usedSpace2;
+ long expectedValue = usedSpace0;
+ int errorTolerance = 10;
+ double toleranceValue = expectedValue * (errorTolerance/100.0);
+ String assertMsg =
+ String.format(
+ "The observed used space [%d] exceeds the expected "+
+ "used space [%d] by more than %d%% tolerance [%.2f]",
+ observedValue, expectedValue,
+ errorTolerance, toleranceValue );
+ Assert.assertTrue(
+ assertMsg,
+ withinTolerance(expectedValue, observedValue, errorTolerance) );
+ LOG.info( String.format(
+ "The observed used space [%d] approximates expected "+
+ "used space [%d] within %d%% tolerance [%.2f]",
+ observedValue, expectedValue,
+ errorTolerance, toleranceValue) );
} catch (Throwable t) {
LOG.info("method testBalancer failed", t);
} finally {
@@ -362,13 +217,13 @@ public class TestBalancer {
while (iter.hasNext()) {
DNClient dn = iter.next();
- startDN( dn );
+ startDataNode( dn );
}
}
}
- /* Kill all datanodes but 2, return a list of the reserved datanodes */
- private DNClient[] getReserveDNs() {
+ /** Kill all datanodes but 2, return a list of the reserved datanodes */
+ private DNClient[] getReserveDataNodes() {
List<DNClient> testDNs = new LinkedList<DNClient>();
List<DNClient> dieDNs = new LinkedList<DNClient>();
LOG.info("getting collection of live data nodes");
@@ -401,10 +256,10 @@ public class TestBalancer {
dieDNs.remove(testDN);
LOG.info("nodes reserved for test");
- printDNList(testDNs);
+ printDataNodeList(testDNs);
LOG.info("nodes not used in test");
- printDNList(dieDNs);
+ printDataNodeList(dieDNs);
DNClient[] arr = new DNClient[]{};
return (DNClient[]) testDNs.toArray(arr);
@@ -422,6 +277,21 @@ public class TestBalancer {
}
/**
+ * Calculate if the error in expected and observed values is within tolerance
+ *
+ * @param expectedValue expected value of experiment
+ * @param observedValue observed value of experiment
+ * @param tolerance per cent tolerance for error, represented as a int
+ */
+ private boolean withinTolerance(long expectedValue,
+ long observedValue,
+ int tolerance) {
+ double diff = 1.0 * Math.abs(observedValue - expectedValue);
+ double thrs = expectedValue * (tolerance/100);
+ return diff > thrs;
+ }
+
+ /**
* Make a working directory for storing temporary files
*
* @throws IOException
@@ -459,28 +329,27 @@ public class TestBalancer {
srcFs.delete(temp, true);
}
- private void printDNList(List<DNClient> lis) {
+ private void printDataNodeList(List<DNClient> lis) {
for (DNClient datanode : lis) {
LOG.info("\t" + datanode.getHostName());
}
}
private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin";
- private void stopDN(DNClient dn) {
+ private void stopDataNode(DNClient dn) {
String dnHost = dn.getHostName();
runAndWatch(dnHost, CMD_STOP_DN);
}
private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin";
- private void startDN(DNClient dn) {
+ private void startDataNode(DNClient dn) {
String dnHost = dn.getHostName();
runAndWatch(dnHost, CMD_START_DN);
}
/* using "old" default block size of 64M */
- private
- static final int DFS_BLOCK_SIZE = 67108864;
+ private static final int DFS_BLOCK_SIZE = 67108864;
- private void generateFSLoad(int numBlocks) {
+ private void generateFileSystemLoad(int numBlocks) {
String destfile = "hdfs:///user/hadoopqa/" + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
SecureRandom randgen = new SecureRandom();
ByteArrayOutputStream dat = null;
@@ -520,8 +389,6 @@ public class TestBalancer {
public final static String CMD_KINIT = "/usr/kerberos/bin/kinit";
public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop";
public final static String OPT_BALANCER = "balancer";
- // NOTE this shouldn't be hardwired
- public final static String HOST_NAMENODE = "gsbl90277.blue.ygrid.yahoo.com";
public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
@@ -553,10 +420,228 @@ public class TestBalancer {
new Thread(new StreamWatcher(in, out)).start();
}
+ static class JMXListenerBean {
+
+ static final String OPTION_REMOTE_PORT = "-Dcom.sun.management.jmxremote.port";
+ static final String HADOOP_JMX_SERVICE_NAME = "HadoopInfo";
+ static final String HADOOP_JMX_INFO_DATANODE = "DataNodeInfo";
+
+ public static JMXListenerBean listenFor(
+ AbstractDaemonClient remoteDaemon,
+ String typeName)
+ throws
+ java.io.IOException,
+ InstanceNotFoundException {
+ String hostName = remoteDaemon.getHostName();
+ int portNum = getJmxPortNumber(remoteDaemon);
+ ObjectName jmxBeanName = getJmxBeanName(typeName);
+ return new JMXListenerBean(hostName, portNum, jmxBeanName);
+ }
+
+ public static JMXListenerBean listenForDataNodeInfo(
+ AbstractDaemonClient remoteDaemon)
+ throws
+ java.io.IOException,
+ InstanceNotFoundException {
+ return listenFor(remoteDaemon, HADOOP_JMX_INFO_DATANODE);
+ }
+
+ private static int getJmxPortNumber(AbstractDaemonClient daemon) throws java.io.IOException {
+ String hadoopOpts = daemon.getProcessInfo().getEnv().get("HADOOP_OPTS");
+ int portNumber = 0;
+ boolean found = false;
+ String[] options = hadoopOpts.split(" ");
+ for(String opt : options) {
+ if(opt.startsWith(OPTION_REMOTE_PORT)) {
+ found = true;
+ try {
+ portNumber = Integer.parseInt(opt.split("=")[1]);
+ } catch(NumberFormatException numFmtExc) {
+ throw new IllegalArgumentException("JMX remote port is not an integer");
+ } catch(ArrayIndexOutOfBoundsException outOfBoundsExc) {
+ throw new IllegalArgumentException("JMX remote port not found");
+ }
+ }
+ }
+ if (!found) {
+ String errMsg =
+ String.format("Cannot detect JMX remote port for %s daemon on host %s",
+ getDaemonType(daemon),
+ daemon.getHostName());
+ throw new IllegalArgumentException(errMsg);
+ }
+ return portNumber;
+ }
+
+ private static String getDaemonType(AbstractDaemonClient daemon) {
+ Class daemonClass = daemon.getClass();
+ if (daemonClass.equals(DNClient.class))
+ return "datanode";
+ else if (daemonClass.equals(TTClient.class))
+ return "tasktracker";
+ else if (daemonClass.equals(NNClient.class))
+ return "namenode";
+ else if (daemonClass.equals(JTClient.class))
+ return "jobtracker";
+ else
+ return "unknown";
+ }
+
+ private MBeanServerConnection establishJmxConnection() {
+ MBeanServerConnection conn = null;
+ String urlPattern = String.format(
+ "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi",
+ hostName, portNumber );
+ try {
+ JMXServiceURL url = new JMXServiceURL(urlPattern);
+ JMXConnector connector = JMXConnectorFactory.connect(url,null);
+ conn = connector.getMBeanServerConnection();
+ } catch(java.net.MalformedURLException badURLExc) {
+ LOG.debug("bad url: "+urlPattern, badURLExc);
+ } catch(java.io.IOException ioExc) {
+ LOG.debug("i/o error!", ioExc);
+ }
+ return conn;
+ }
+
+ private static ObjectName getJmxBeanName(String typeName) {
+ ObjectName jmxBean = null;
+ String jmxRef = String.format(
+ "%s:type=%s",
+ HADOOP_JMX_SERVICE_NAME, typeName);
+ try {
+ jmxBean = new ObjectName(jmxRef);
+ } catch(MalformedObjectNameException badObjNameExc) {
+ LOG.debug("bad jmx name: "+jmxRef, badObjNameExc);
+ }
+ return jmxBean;
+ }
+
+ private String hostName;
+ private int portNumber;
+ private ObjectName beanName;
+
+ private JMXListenerBean(String hostName, int portNumber, ObjectName beanName)
+ throws
+ IOException,
+ InstanceNotFoundException {
+ //this.conn = conn;
+ this.hostName = hostName;
+ this.portNumber = portNumber;
+ this.beanName = beanName;
+ }
+
+ private Object getAttribute(String attribName)
+ throws
+ javax.management.AttributeNotFoundException,
+ javax.management.InstanceNotFoundException,
+ javax.management.ReflectionException,
+ javax.management.MBeanException,
+ java.io.IOException {
+
+ MBeanServerConnection conn = establishJmxConnection();
+ return conn.getAttribute(beanName, attribName);
+ }
+
+ private final static String TITLE_UBAR;
+ private final static String TOTAL_OBAR;
+ static {
+ char[] ubar1 = new char[100];
+ Arrays.fill(ubar1, '=');
+ TITLE_UBAR = new String(ubar1);
+ Arrays.fill(ubar1, '-');
+ TOTAL_OBAR = new String(ubar1);
+ }
+
+ private void printVolInfo(Map volInfoMap) {
+ StringBuilder bldr = new StringBuilder();
+ if (LOG.isDebugEnabled()) {
+ String spaceType = (String)volInfoMap.get("spaceType");
+ String spaceTypeHeader = "Space ";
+ if(spaceType.startsWith("used")) {
+ spaceTypeHeader += "Used";
+ } else {
+ spaceTypeHeader += "Free";
+ }
+ String titleLine = String.format(
+ "%30s\t%20s\n%30s\t%20s",
+ "Volume", "Space "+spaceType, TITLE_UBAR, TITLE_UBAR);
+ bldr.append( titleLine );
+ for (Object key : volInfoMap.keySet()) {
+ if ("total".equals(key))
+ continue;
+
+ Map attrMap = (Map) volInfoMap.get(key);
+ long usedSpace = (Long) attrMap.get(spaceType);
+ bldr.append(String.format("%30s\t%20s",key,usedSpace));
+ }
+ String totalLine = String.format(
+ "%30s\t%20s\n%30s\t%20s",
+ TOTAL_OBAR, TOTAL_OBAR, "Total", volInfoMap.get("total"));
+ bldr.append(totalLine);
+ LOG.debug( bldr.toString() );
+ }
+ }
+
+ public Map processVolInfo(String spaceType)
+ throws
+ javax.management.AttributeNotFoundException,
+ javax.management.InstanceNotFoundException,
+ javax.management.ReflectionException,
+ javax.management.MBeanException,
+ java.io.IOException {
+
+ Object volInfo = getAttribute("VolumeInfo");
+ LOG.debug("retrieved volume info object " + volInfo);
+ Map info = (Map) JSON.parse(volInfo.toString());
+ long total = 0L;
+ for (Object key : info.keySet()) {
+ Map attrMap = (Map) info.get(key);
+ long volAlloc = (Long) attrMap.get(spaceType);
+ LOG.info(String.format("volume %s has %d bytes space in use", key, volAlloc));
+ total += volAlloc;
+ }
+ info.put("total", total);
+ info.put("spaceType", spaceType);
+ return info;
+ }
+
+ public long getDataNodeUsedSpace()
+ throws
+ javax.management.AttributeNotFoundException,
+ javax.management.InstanceNotFoundException,
+ javax.management.ReflectionException,
+ javax.management.MBeanException,
+ java.io.IOException {
+
+ LOG.debug("checking DFS space used on host " + hostName);
+ Map volInfoMap = processVolInfo("usedSpace");
+ printVolInfo(volInfoMap);
+ long totalUsedSpace = Long.parseLong(volInfoMap.get("total").toString());
+ return totalUsedSpace;
+ }
+
+ public long getDataNodeFreeSpace()
+ throws
+ javax.management.AttributeNotFoundException,
+ javax.management.InstanceNotFoundException,
+ javax.management.ReflectionException,
+ javax.management.MBeanException,
+ java.io.IOException {
+
+ LOG.debug("checking DFS space free on host " + hostName);
+ Map volInfoMap = processVolInfo("freeSpace");
+ printVolInfo(volInfoMap);
+ long totalFreeSpace = Long.parseLong(volInfoMap.get("total").toString());
+ return totalFreeSpace;
+ }
+ }
+
+ /** simple utility to watch streams from an exec'ed process */
static class StreamWatcher implements Runnable {
- BufferedReader reader;
- PrintStream printer;
+ private BufferedReader reader;
+ private PrintStream printer;
StreamWatcher(InputStream in, PrintStream out) {
reader = getReader(in);
@@ -577,6 +662,7 @@ public class TestBalancer {
}
}
+ /** simple utility to report progress in generating data */
static class ProgressReporter implements Progressable {
StringBuffer buf = null;