You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:51:46 UTC

svn commit: r1077753 - /hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java

Author: omalley
Date: Fri Mar  4 04:51:45 2011
New Revision: 1077753

URL: http://svn.apache.org/viewvc?rev=1077753&view=rev
Log:
commit e2b0938f577fe23a2bd5b750322cbb8b76502900
Author: Alfred Thompson <at...@yahoo-inc.com>
Date:   Fri Sep 24 15:06:09 2010 +0000

    This is the initial revision of TestBalancer, a system test for the HDFS Balancer.

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java?rev=1077753&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java Fri Mar  4 04:51:45 2011
@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.net.URI;
+import java.security.SecureRandom;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanException;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.ReflectionException;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hdfs.test.system.HDFSCluster;
+import org.apache.hadoop.hdfs.test.system.NNClient;
+import org.apache.hadoop.hdfs.test.system.DNClient;
+
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.util.ajax.JSON;
+
+public class TestBalancer {
+
+    private static final Log LOG = LogFactory.getLog(TestBalancer.class);
+    private static final String BALANCER_TEMP_DIR = "balancer-temp";
+    private Configuration hadoopConf;
+    private HDFSCluster dfsCluster;
+    private MRCluster mrCluster;
+    // TODO don't hardwire these, introspect the cluster
+    private static final String NAMENODE = "gsbl90277.blue.ygrid.yahoo.com";
+    private static final String[] ENDPOINT_JMX = {
+        "gsbl90277.blue.ygrid.yahoo.com-8008",
+        "gsbl90276.blue.ygrid.yahoo.com-24812",
+        "gsbl90275.blue.ygrid.yahoo.com-24810",
+        "gsbl90274.blue.ygrid.yahoo.com-24808",
+        "gsbl90273.blue.ygrid.yahoo.com-24806",
+        "gsbl90272.blue.ygrid.yahoo.com-24804",
+        "gsbl90271.blue.ygrid.yahoo.com-24802",
+        "gsbl90270.blue.ygrid.yahoo.com-24800"
+    };
+    private Map<String, MBeanServerConnection> endpointMap =
+            new HashMap<String, MBeanServerConnection>();
+
+    public TestBalancer() throws Exception {
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        hadoopConf = new Configuration();
+        dfsCluster = HDFSCluster.createCluster(hadoopConf);
+        dfsCluster.setUp();
+        //TODO no need for mr cluster anymore
+        mrCluster = MRCluster.createCluster(hadoopConf);
+        mrCluster.setUp();
+        connectJMX();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        dfsCluster.tearDown();
+        mrCluster.tearDown();
+    }
+
+    /** Connect to JMX agents on HDFS cluster nodes */
+    private void connectJMX() {
+        final int HOST = 0;
+        final int PORT = 1;
+        for (String endpoint : ENDPOINT_JMX) {
+            String[] toks = endpoint.split("-");
+            String host = toks[HOST];
+            String port = toks[PORT];
+            LOG.info("HOST=" + host + ", PORT=" + port);
+            MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port);
+            endpointMap.put(host, jmxEndpoint);
+        }
+    }
+
+    private long getDataNodeFreeSpace(DNClient datanode) {
+        String dnHost = datanode.getHostName();
+        Object volObj = getDNAttribute(dnHost, "VolumeInfo");
+        Map volInfoMap = (Map) JSON.parse(volObj.toString());
+        long totalFreeSpace = 0L;
+        for (Object key : volInfoMap.keySet()) {
+            Map attrMap = (Map) volInfoMap.get(key);
+            long freeSpace = (Long) attrMap.get("freeSpace");
+            //LOG.info( String.format("volume %s has %d bytes free space left", key, freeSpace) );
+            totalFreeSpace += freeSpace;
+        }
+        //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj));
+        return totalFreeSpace;
+    }
+
+    private long getDataNodeUsedSpace(DNClient datanode) {
+        String dnHost = datanode.getHostName();
+        LOG.debug("checking DFS space used on host "+dnHost);
+        Object volObj = getDNAttribute(dnHost, "VolumeInfo");
+        LOG.debug("retrieved volume info object "+volObj);
+        Map volInfoMap = (Map) JSON.parse(volObj.toString());
+        long totalUsedSpace = 0L;
+        for (Object key : volInfoMap.keySet()) {
+            Map attrMap = (Map) volInfoMap.get(key);
+            // TODO should we be using free space here?
+            long usedSpace = (Long) attrMap.get("usedSpace");
+            LOG.info( String.format("volume %s has %d bytes used space", key, usedSpace) );
+            totalUsedSpace += usedSpace;
+        }
+        //LOG.info(String.format("got from host %s volinfo:\n%s", dnHost, volObj));
+        return totalUsedSpace;
+    }
+
+    // TODO just throw the dang exceptions
+    private Object getDNAttribute(String host, String attribName) {
+        ObjectName name = null;
+        Object attribVal = null;
+        try {
+            MBeanServerConnection conn = endpointMap.get(host);
+            name = new ObjectName("HadoopInfo:type=DataNodeInfo");
+            attribVal = conn.getAttribute(name, attribName);
+        } catch (javax.management.AttributeNotFoundException attribNotFoundExc) {
+            LOG.warn(String.format("no attribute matching %s found", attribName),
+                    attribNotFoundExc);
+        } catch (javax.management.MalformedObjectNameException badObjNameExc) {
+            LOG.warn("bad object name: " + name, badObjNameExc);
+        } catch (javax.management.InstanceNotFoundException instNotFoundExc) {
+            LOG.warn("no MBean instance found", instNotFoundExc);
+        } catch (javax.management.ReflectionException reflectExc) {
+            LOG.warn("reflection error!", reflectExc);
+        } catch (javax.management.MBeanException mBeanExc) {
+            LOG.warn("MBean error!", mBeanExc);
+        } catch (java.io.IOException ioExc) {
+            LOG.debug("i/o error!", ioExc);
+        }
+        return attribVal;
+    }
+    //@Test
+
+    public void testJMXRemote() {
+        final int HOST = 0;
+        final int PORT = 1;
+        for (String endpoint : ENDPOINT_JMX) {
+            String[] toks = endpoint.split("-");
+            String host = toks[HOST];
+            String port = toks[PORT];
+            //LOG.info("HOST="+host+", PORT="+port);
+            MBeanServerConnection jmxEndpoint = getJMXEndpoint(host, port);
+            endpointMap.put(host, jmxEndpoint);
+        }
+
+
+        Iterator<String> iter = endpointMap.keySet().iterator();
+        while (iter.hasNext()) {
+            String host = iter.next();
+            MBeanServerConnection conn = endpointMap.get(host);
+            ObjectName mBeanName = null;
+            try {
+                if (NAMENODE.equals(host)) {
+                    // TODO make this a constant
+                    mBeanName = new ObjectName("HadoopInfo:type=NameNodeInfo");
+                } else {
+                    mBeanName = new ObjectName("HadoopInfo:type=DataNodeInfo");
+                }
+                Object versionObj = conn.getAttribute(mBeanName, "Version");
+                LOG.info("host [" + host + "] runs version " + versionObj);
+            } catch (javax.management.AttributeNotFoundException attribNotFoundExc) {
+                // TODO don't hard-wire attrib name
+                LOG.warn("no attribute matching `Version' found", attribNotFoundExc);
+            } catch (javax.management.MalformedObjectNameException badObjNameExc) {
+                LOG.warn("bad object name: " + mBeanName, badObjNameExc);
+            } catch (javax.management.InstanceNotFoundException instNotFoundExc) {
+                LOG.warn("no MBean instance found", instNotFoundExc);
+            } catch (javax.management.ReflectionException reflectExc) {
+                LOG.warn("reflection error!", reflectExc);
+            } catch (javax.management.MBeanException mBeanExc) {
+                LOG.warn("MBean error!", mBeanExc);
+            } catch (java.io.IOException ioExc) {
+                LOG.debug("i/o error!", ioExc);
+            }
+        }
+    }
+
+    private MBeanServerConnection getJMXEndpoint(String host, String port) {
+        MBeanServerConnection conn = null;
+        String urlPattern = null;
+        try {
+            urlPattern =
+                    "service:jmx:rmi:///jndi/rmi://"
+                    + host + ":"
+                    + port
+                    + "/jmxrmi";
+            JMXServiceURL url = new JMXServiceURL(urlPattern);
+            JMXConnector connector = JMXConnectorFactory.connect(url);
+            conn = connector.getMBeanServerConnection();
+        } catch (java.net.MalformedURLException badURLExc) {
+            LOG.debug("bad url: " + urlPattern, badURLExc);
+        } catch (java.io.IOException ioExc) {
+            LOG.debug("i/o error!", ioExc);
+        }
+        return conn;
+    }
+    /* debug--
+    public void testHello() {
+    LOG.info("hello!");
+    }*/
+
+    //@Test
+    public void testNameNodePing() throws IOException {
+        LOG.info("testing filesystem ping");
+        NNClient namenode = dfsCluster.getNNClient();
+        namenode.ping();
+        LOG.info("done.");
+    }
+
+    //@Test
+    public void testNameNodeConnectDisconnect() throws IOException {
+        LOG.info("connecting to namenode");
+        NNClient namenode = dfsCluster.getNNClient();
+        namenode.connect();
+        LOG.info("done.");
+        LOG.info("disconnecting from namenode");
+        namenode.disconnect();
+    }
+
+    /**
+     * The basic scenario for balancer test is as follows
+     *
+     *  - Bring up cluster with 1 DataNode
+     *  - Load DataNode to >50%
+     *  - Count files/blocks on DataNode
+     *  - Add new, empty DataNode to cluster
+     *  - Run Balancer
+     *  - Count files/blocks on DataNodes
+     *  - Blocks counts from before and after Balancer run should be consistent
+     *
+     */
+    @Test
+    public void testBalancerBasicScenario() throws IOException {
+        List<DNClient> killDNList = null;
+        List<DNClient> testDNList = null;
+        Path balancerTempDir = null;
+        try {
+            DNClient[] datanodes = getReserveDNs();
+            DNClient datanode1 = datanodes[0];
+            DNClient datanode2 = datanodes[1];
+
+            LOG.info("attempting to kill/suspend all the nodes not used for this test");
+            Iterator<DNClient> iter = dfsCluster.getDNClients().iterator();
+            int i = 0;
+            while (iter.hasNext()) {
+                try {
+                    DNClient dn = iter.next();
+                    // TODO kill doesn't work anymore
+                    // TODO do a ssh to admin gateway and sudo yinst with command text do down a specific datanode
+                    stopDN( dn );
+                    i++;
+                } catch (Exception e) {
+                    LOG.info("error shutting down node " + i + ": " + e);
+                }
+            }
+            
+            LOG.info("attempting to kill both test nodes");
+            // TODO add check to make sure there is enough capacity on these nodes to run test
+            stopDN(datanode1);
+            stopDN(datanode2);
+
+            LOG.info("starting up datanode ["+
+            datanode1.getHostName()+
+            "] and loading it with data");
+            startDN(datanode1);
+            
+            LOG.info("datanode " + datanode1.getHostName()
+                    + " contains " + getDataNodeUsedSpace(datanode1) + " bytes");
+            // mkdir balancer-temp
+            balancerTempDir = makeTempDir();
+            // TODO write 2 blocks to file system
+            LOG.info("generating filesystem load");
+            generateFSLoad(2);  // generate 2 blocks of test data
+
+            LOG.info("measure space used on 1st node");
+            long usedSpace0 = getDataNodeUsedSpace(datanode1);
+            LOG.info("datanode " + datanode1.getHostName()
+                    + " contains " + usedSpace0 + " bytes");
+
+            LOG.info("bring up a 2nd node and run balancer on DFS");
+            startDN(datanode2);
+            runBalancer();
+
+            LOG.info("measure blocks and files on both nodes, assert these "
+                    + "counts are identical pre- and post-balancer run");
+            long usedSpace1 = getDataNodeUsedSpace(datanode1);
+            long usedSpace2 = getDataNodeUsedSpace(datanode2);
+            Assert.assertEquals(usedSpace0, usedSpace1 + usedSpace2);
+
+        } catch (Throwable t) {
+            LOG.info("method testBalancer failed", t);
+        } finally {
+            // finally block to run cleanup
+            LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
+            try {
+                deleteTempDir(balancerTempDir);
+            } catch (Exception e) {
+                LOG.warn("problem cleaning up temp dir", e);
+            }
+
+            // restart killed nodes
+            Iterator<DNClient> iter = dfsCluster.getDNClients().iterator();
+
+            while (iter.hasNext()) {
+                DNClient dn = iter.next();
+                startDN( dn );
+            }
+        }
+    }
+    /* Kill all datanodes but 2, return a list of the reserved datanodes */
+
+    private DNClient[] getReserveDNs() {
+        List<DNClient> testDNs = new LinkedList<DNClient>();
+        List<DNClient> dieDNs = new LinkedList<DNClient>();
+        LOG.info("getting collection of live data nodes");
+        NNClient namenode = dfsCluster.getNNClient();
+        List<DNClient> dnList = dfsCluster.getDNClients();
+        int dnCount = dnList.size();
+        if (dnList.size() < 2) {
+            // TODO throw a non-RuntimeException here instead
+            String msg = String.format(
+                    "not enough datanodes available to run test,"
+                    + " need 2 datanodes but have only %d available",
+                    dnCount);
+            throw new RuntimeException(msg);
+        }
+        LOG.info("selecting 2 nodes for test");
+        dieDNs = new LinkedList<DNClient>(dnList);
+        testDNs = new LinkedList<DNClient>();
+
+        final int LEN = dnCount - 1;
+        int i = getRandom(LEN);
+        DNClient testDN = dieDNs.get(i);
+        testDNs.add(testDN);
+        dieDNs.remove(testDN);
+        int j = i;
+        do {
+            i = getRandom(LEN);
+        } while (i != j);
+        testDN = dieDNs.get(i);
+        testDNs.add(testDN);
+        dieDNs.remove(testDN);
+
+        LOG.info("nodes reserved for test");
+        printDNList(testDNs);
+
+        LOG.info("nodes not used in test");
+        printDNList(dieDNs);
+
+        DNClient[] arr = new DNClient[]{};
+        return (DNClient[]) testDNs.toArray(arr);
+    }
+
+    /**
+     * Return a random number between 0 and N inclusive.
+     *
+     * @param int n
+     * @param n  max number to return
+     * @return random integer between 0 and N
+     */
+    private int getRandom(int n) {
+        return (int) (n * Math.random());
+    }
+
+    /**
+     * Make a working directory for storing temporary files
+     *
+     * @throws IOException
+     */
+    private Path makeTempDir() throws IOException {
+        Path temp = new Path(BALANCER_TEMP_DIR);
+        FileSystem srcFs = temp.getFileSystem(hadoopConf);
+        FileStatus fstatus = null;
+        try {
+            fstatus = srcFs.getFileStatus(temp);
+            if (fstatus.isDir()) {
+                LOG.warn(BALANCER_TEMP_DIR + ": File exists");
+            } else {
+                LOG.warn(BALANCER_TEMP_DIR + " exists but is not a directory");
+            }
+            deleteTempDir(temp);
+        } catch (FileNotFoundException fileNotFoundExc) {
+        } finally {
+            if (!srcFs.mkdirs(temp)) {
+                throw new IOException("failed to create " + BALANCER_TEMP_DIR);
+            }
+        }
+        return temp;
+    }
+
+    /**
+     * Remove the working directory used to store temporary files
+     *
+     * @param temp
+     * @throws IOException
+     */
+    private void deleteTempDir(Path temp) throws IOException {
+        FileSystem srcFs = temp.getFileSystem(hadoopConf);
+        LOG.info("attempting to delete path " + temp + "; this path exists? -> " + srcFs.exists(temp));
+        srcFs.delete(temp, true);
+    }
+
+    private void printDNList(List<DNClient> lis) {
+        for (DNClient datanode : lis) {
+            LOG.info("\t" + datanode.getHostName());
+        }
+    }
+
+    private final static String CMD_STOP_DN = "sudo yinst stop hadoop_datanode_admin";
+    private void stopDN(DNClient dn) {
+        String dnHost = dn.getHostName();
+        runAndWatch(dnHost, CMD_STOP_DN);
+    }
+    private final static String CMD_START_DN = "sudo yinst start hadoop_datanode_admin";
+    private void startDN(DNClient dn) {
+        String dnHost = dn.getHostName();
+        runAndWatch(dnHost, CMD_START_DN);
+    }
+
+    /* using "old" default block size of 64M */
+    private
+    static final int DFS_BLOCK_SIZE = 67108864;
+
+    private void generateFSLoad(int numBlocks) {
+        String destfile = "hdfs:///user/hadoopqa/" + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
+        SecureRandom randgen = new SecureRandom();
+        ByteArrayOutputStream dat = null;
+        ByteArrayInputStream in = null;
+        final int CHUNK = 4096;
+        try {
+            for (int i = 0; i < numBlocks; i++) {
+                FileSystem fs = FileSystem.get(URI.create(destfile), hadoopConf);
+                OutputStream out = fs.create(new Path(destfile), new ProgressReporter());
+                dat = new ByteArrayOutputStream(DFS_BLOCK_SIZE);
+                for (int z = 0; z < DFS_BLOCK_SIZE; z += CHUNK) {
+                    byte[] bytes = new byte[CHUNK];
+                    randgen.nextBytes(bytes);
+                    dat.write(bytes, 0, CHUNK);
+                }
+
+                in = new ByteArrayInputStream(dat.toByteArray());
+                IOUtils.copyBytes(in, out, CHUNK, true);
+                LOG.info("wrote block " + (i + 1) + " of " + numBlocks);
+            }
+        } catch (IOException ioExc) {
+            LOG.warn("f/s loadgen failed!", ioExc);
+        } finally {
+            try {
+                dat.close();
+            } catch (Exception e) {
+            }
+            try {
+                in.close();
+            } catch (Exception e) {
+            }
+        }
+    }
+    // TODO this should be taken from the environment
+    public final static String HADOOP_HOME = "/grid/0/gs/gridre/yroot.biga/share/hadoop-current";
+    public final static String CMD_SSH = "/usr/bin/ssh";
+    public final static String CMD_KINIT = "/usr/kerberos/bin/kinit";
+    public final static String CMD_HADOOP = HADOOP_HOME + "/bin/hadoop";
+    public final static String OPT_BALANCER = "balancer";
+    // NOTE this shouldn't be hardwired
+    public final static String HOST_NAMENODE = "gsbl90277.blue.ygrid.yahoo.com";
+    public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
+    public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
+
+    private void runBalancer() throws IOException {
+        String balancerCommand = String.format("\"%s -k -t %s %s; %s %s",
+                CMD_KINIT,
+                KERB_KEYTAB,
+                KERB_PRINCIPAL,
+                CMD_HADOOP,
+                OPT_BALANCER);
+        String nnHost = dfsCluster.getNNClient().getHostName();
+        runAndWatch(nnHost, balancerCommand);
+    }
+
+    private void runAndWatch(String remoteHost, String remoteCommand) {
+        try {
+            Process proc = new ProcessBuilder(CMD_SSH, remoteHost, remoteCommand).start();
+            watchProcStream(proc.getInputStream(), System.out);
+            watchProcStream(proc.getErrorStream(), System.err);
+            int exitVal = proc.waitFor();
+        } catch(InterruptedException intExc) {
+            LOG.warn("got thread interrupt error", intExc);
+        } catch(IOException ioExc) {
+            LOG.warn("got i/o error", ioExc);
+        }
+    }
+
+    private void watchProcStream(InputStream in, PrintStream out) {
+        new Thread(new StreamWatcher(in, out)).start();
+    }
+
+    static class StreamWatcher implements Runnable {
+
+        BufferedReader reader;
+        PrintStream printer;
+
+        StreamWatcher(InputStream in, PrintStream out) {
+            reader = getReader(in);
+            printer = out;
+        }
+
+        private static BufferedReader getReader(InputStream in) {
+            return new BufferedReader(new InputStreamReader(in));
+        }
+
+        public void run() {
+            try {
+                if (reader.ready()) {
+                    printer.println(reader.readLine());
+                }
+            } catch (IOException ioExc) {
+            }
+        }
+    }
+
+    static class ProgressReporter implements Progressable {
+
+        StringBuffer buf = null;
+
+        public void progress() {
+            if (buf == null) {
+                buf = new StringBuffer();
+            }
+            buf.append(".");
+            if (buf.length() == 10000) {
+                LOG.info("..........");
+                buf = null;
+            }
+        }
+    }
+}