You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:53:03 UTC
svn commit: r1077766 -
/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
Author: omalley
Date: Fri Mar 4 04:53:02 2011
New Revision: 1077766
URL: http://svn.apache.org/viewvc?rev=1077766&view=rev
Log:
commit 459a0631758ceea58c82e03f448cf1e5021eefc8
Author: Alfred Thompson <at...@yahoo-inc.com>
Date: Mon Oct 25 23:29:00 2010 +0000
This revision of the DFS Rebalancer system test implements anew or modifies the following tests:
. testBalancerBasicScenario
. testBalancerSimple
. testBalancerOnBalancedCluster
. testBalancerConfiguredWithThresholdValueNegative
. testBalancerConfiguredWithThresholdValueOutOfRange
. testBalancerConfiguredWithThresholdValueAlphanumeric
. testBalancerWithOnlyHalfOfDataNodesRunning
. testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded
Some utility methods have been added or changed to support these tests.
Pending work remains to verify these tests on a working Herriot cluster.
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java?rev=1077766&r1=1077765&r2=1077766&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/system/java/org/apache/hadoop/hdfs/TestBalancer.java Fri Mar 4 04:53:02 2011
@@ -27,22 +27,21 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.URI;
+
import java.security.SecureRandom;
-import java.util.Arrays;
+
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
+import java.util.Set;
+
+import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.LogFactory;;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -53,13 +52,8 @@ import org.apache.hadoop.hdfs.test.syste
import org.apache.hadoop.hdfs.test.system.NNClient;
import org.apache.hadoop.hdfs.test.system.DNClient;
-import org.apache.hadoop.mapreduce.test.system.MRCluster;
import org.apache.hadoop.io.IOUtils;
-
-import org.apache.hadoop.mapreduce.test.system.JTClient;
-import org.apache.hadoop.mapreduce.test.system.TTClient;
-import org.apache.hadoop.test.system.AbstractDaemonClient;
import org.apache.hadoop.util.Progressable;
import org.junit.After;
@@ -74,7 +68,6 @@ public class TestBalancer {
private static final String BALANCER_TEMP_DIR = "balancer-temp";
private Configuration hadoopConf;
private HDFSCluster dfsCluster;
- private MRCluster mrCluster;
public TestBalancer() throws Exception {
}
@@ -84,16 +77,11 @@ public class TestBalancer {
hadoopConf = new Configuration();
dfsCluster = HDFSCluster.createCluster(hadoopConf);
dfsCluster.setUp();
- //TODO no need for mr cluster anymore
- mrCluster = MRCluster.createCluster(hadoopConf);
- mrCluster.setUp();
- //connectJMX();
}
@After
public void tearDown() throws Exception {
dfsCluster.tearDown();
- mrCluster.tearDown();
}
// Trivial @Test
@@ -128,78 +116,37 @@ public class TestBalancer {
*/
@Test
public void testBalancerBasicScenario() throws IOException {
- List<DNClient> killDNList = null;
- List<DNClient> testDNList = null;
Path balancerTempDir = null;
try {
- DNClient[] datanodes = getReserveDatanodes();
- DNClient datanode1 = datanodes[0];
- DNClient datanode2 = datanodes[1];
-
- LOG.info("attempting to kill/suspend all the nodes not used for this test");
- Iterator<DNClient> iter = dfsCluster.getDNClients().iterator();
- int i = 0;
- while (iter.hasNext()) {
- try {
- DNClient dn = iter.next();
- // kill doesn't work with secure-HDFS, so using our stopDataNode() method
- stopDatanode( dn );
- i++;
- } catch (Exception e) {
- LOG.info("error shutting down node " + i + ": " + e);
- }
- }
+ List<DNClient> testnodes = reserveDatanodesForTest(2);
+ DNClient testnode1 = testnodes.get(0);
+ DNClient testnode2 = testnodes.get(1);
+ shutdownNonTestNodes(testnodes);
LOG.info("attempting to kill both test nodes");
- // TODO add check to make sure there is enough capacity on these nodes to run test
- stopDatanode(datanode1);
- stopDatanode(datanode2);
+ stopDatanode(testnode1);
+ stopDatanode(testnode2);
LOG.info("starting up datanode ["+
- datanode1.getHostName()+
+ testnode1.getHostName()+
"] and loading it with data");
- startDatanode(datanode1);
+ startDatanode(testnode1);
// mkdir balancer-temp
balancerTempDir = makeTempDir();
- // TODO write 2 blocks to file system
+ // write 2 blocks to file system
LOG.info("generating filesystem load");
// TODO spec blocks to generate by blockCount, blockSize, # of writers
generateFileSystemLoad(2); // generate 2 blocks of test data
LOG.info("measure space used on 1st node");
- long usedSpace0 = getDatanodeUsedSpace(datanode1);
- LOG.info("datanode " + datanode1.getHostName()
+ long usedSpace0 = getDatanodeUsedSpace(testnode1);
+ LOG.info("datanode " + testnode1.getHostName()
+ " contains " + usedSpace0 + " bytes");
LOG.info("bring up a 2nd node and run balancer on DFS");
- startDatanode(datanode2);
- runBalancer();
-
- //JMXListenerBean lsnr2 = JMXListenerBean.listenForDataNodeInfo(datanode2);
-
- LOG.info("measure blocks and files on both nodes, assert these "
- + "counts are identical pre- and post-balancer run");
- long usedSpace1 = getDatanodeUsedSpace(datanode1);
- long usedSpace2 = getDatanodeUsedSpace(datanode2);
- long observedValue = usedSpace1 + usedSpace2;
- long expectedValue = usedSpace0;
- int errorTolerance = 10;
- double toleranceValue = expectedValue * (errorTolerance/100.0);
- String assertMsg =
- String.format(
- "The observed used space [%d] exceeds the expected "+
- "used space [%d] by more than %d%% tolerance [%.2f]",
- observedValue, expectedValue,
- errorTolerance, toleranceValue );
- Assert.assertTrue(
- assertMsg,
- withinTolerance(expectedValue, observedValue, errorTolerance) );
- LOG.info( String.format(
- "The observed used space [%d] approximates expected "+
- "used space [%d] within %d%% tolerance [%.2f]",
- observedValue, expectedValue,
- errorTolerance, toleranceValue) );
+ startDatanode(testnode2);
+ runBalancerAndVerify(testnodes);
} catch (Throwable t) {
LOG.info("method testBalancer failed", t);
} finally {
@@ -221,23 +168,38 @@ public class TestBalancer {
}
}
- /** Kill all datanodes but 2, return a list of the reserved datanodes */
- private DNClient[] getReserveDatanodes() {
+ private void shutdownNonTestNodes(List<DNClient> testnodes) {
+ Set killSet = new HashSet(getAllDatanodes());
+ killSet.removeAll(testnodes);
+ LOG.info("attempting to kill/suspend all the nodes not used for this test");
+ Iterator<DNClient> iter = killSet.iterator();
+ DNClient dn = null;
+ while (iter.hasNext()) {
+ dn = iter.next();
+ // kill may not work with some secure-HDFS configs,
+ // so using our stopDataNode() method
+ stopDatanode(dn);
+ }
+ }
+
+ /**
+ * Kill all datanodes but leave reservationCount nodes alive,
+ * return a list of the reserved datanodes
+ */
+ private List<DNClient> reserveDatanodesForTest(int reservationCount) {
List<DNClient> testDNs = new LinkedList<DNClient>();
List<DNClient> dieDNs = new LinkedList<DNClient>();
LOG.info("getting collection of live data nodes");
- NNClient namenode = dfsCluster.getNNClient();
- List<DNClient> dnList = dfsCluster.getDNClients();
+ List<DNClient> dnList = getAllDatanodes();
int dnCount = dnList.size();
- if (dnList.size() < 2) {
- // TODO throw a non-RuntimeException here instead
- String msg = String.format(
+ // check to make sure there is enough capacity on these nodes to run test
+ Assert.assertTrue(
+ String.format(
"not enough datanodes available to run test,"
- + " need 2 datanodes but have only %d available",
- dnCount);
- throw new RuntimeException(msg);
- }
- LOG.info("selecting 2 nodes for test");
+ + " need %d datanodes but have only %d available",
+ reservationCount, dnCount),
+ ( dnCount >= reservationCount ));
+ LOG.info("selecting "+reservationCount+" nodes for test");
dieDNs = new LinkedList<DNClient>(dnList);
testDNs = new LinkedList<DNClient>();
@@ -260,8 +222,16 @@ public class TestBalancer {
LOG.info("nodes not used in test");
printDatanodeList(dieDNs);
- DNClient[] arr = new DNClient[]{};
- return (DNClient[]) testDNs.toArray(arr);
+ return testDNs;
+ }
+
+ private List<DNClient> getAllDatanodes() {
+ return dfsCluster.getDNClients();
+ }
+
+ private final static DNClient[] DATANODE_ARRAY = {};
+ private DNClient[] toDatanodeArray(List<DNClient> datanodeList) {
+ return (DNClient[]) datanodeList.toArray(DATANODE_ARRAY);
}
/**
@@ -290,6 +260,45 @@ public class TestBalancer {
return diff > thrs;
}
+ // emulate tolerance calculation in balancer code
+ public final static int DEFAULT_TOLERANCE = 10; // 10%
+ protected boolean isClusterBalanced(DNClient[] datanodes) throws IOException {
+ return isClusterBalanced(datanodes, DEFAULT_TOLERANCE);
+ }
+ protected boolean isClusterBalanced(DNClient[] datanodes, int tolerance)
+ throws IOException {
+
+ Assert.assertFalse("empty datanode array specified",
+ ArrayUtils.isEmpty(datanodes));
+ boolean result = true;
+ double[] utilizationByNode = new double[ datanodes.length ];
+ double totalUsedSpace = 0L;
+ double totalCapacity = 0L;
+ Map datanodeVolumeMap = new HashMap();
+ // accumulate space stored on each node
+ for(int i=0; i<datanodes.length; i++) {
+ DNClient datanode = datanodes[i];
+ Map volumeInfoMap = getDatanodeVolumeAttributes(datanode);
+ long usedSpace = (Long)volumeInfoMap.get(ATTRNAME_USED_SPACE);
+ long capacity = (Long)volumeInfoMap.get(ATTRNAME_CAPACITY );
+ utilizationByNode[i] = ( ((double)usedSpace)/capacity ) * 100;
+ totalUsedSpace += usedSpace;
+ totalCapacity += capacity;
+ }
+ // here we are reusing previously fetched volume-info, for speed
+ // an alternative is to get fresh values from the cluster here instead
+ double avgUtilization = ( totalUsedSpace/totalCapacity ) * 100;
+ for(int i=0; i<datanodes.length; i++) {
+ double varUtilization = Math.abs(avgUtilization - utilizationByNode[i]);
+ if(varUtilization > tolerance) {
+ result = false;
+ break;
+ }
+ }
+
+ return result;
+ }
+
/**
* Make a working directory for storing temporary files
*
@@ -347,17 +356,26 @@ public class TestBalancer {
/* using "old" default block size of 64M */
private static final int DFS_BLOCK_SIZE = 67108864;
-
- private void generateFileSystemLoad(int numBlocks) {
- String destfile = "hdfs:///user/hadoopqa/" + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
+ private static final short DEFAULT_REPLICATION = 3;
+ private void generateFileSystemLoad(long numBlocks) {
+ generateFileSystemLoad(numBlocks, DEFAULT_REPLICATION);
+ }
+ private void generateFileSystemLoad(long numBlocks, short replication) {
+ String destfile = "hdfs:///user/hadoopqa/";// + BALANCER_TEMP_DIR + "/LOADGEN.DAT";
SecureRandom randgen = new SecureRandom();
ByteArrayOutputStream dat = null;
ByteArrayInputStream in = null;
final int CHUNK = 4096;
+ final Configuration testConf = new Configuration(hadoopConf);
try {
+ testConf.setInt("dfs.replication", replication);
for (int i = 0; i < numBlocks; i++) {
- FileSystem fs = FileSystem.get(URI.create(destfile), hadoopConf);
- OutputStream out = fs.create(new Path(destfile), new ProgressReporter());
+ FileSystem fs = FileSystem.get(
+ URI.create(destfile), testConf);
+ OutputStream out = fs.create(
+ new Path(destfile),
+ replication,
+ new ProgressReporter());
dat = new ByteArrayOutputStream(DFS_BLOCK_SIZE);
for (int z = 0; z < DFS_BLOCK_SIZE; z += CHUNK) {
byte[] bytes = new byte[CHUNK];
@@ -391,36 +409,99 @@ public class TestBalancer {
public final static String KERB_KEYTAB = "/homes/hadoopqa/hadoopqa.dev.headless.keytab";
public final static String KERB_PRINCIPAL = "hadoopqa@DEV.YGRID.YAHOO.COM";
- private void runBalancer() throws IOException {
- String balancerCommand = String.format("\"%s -k -t %s %s; %s %s",
+ public final static int DEFAULT_THRESHOLD = 10;
+ private int runBalancer() throws IOException {
+ return runBalancer(DEFAULT_THRESHOLD);
+ }
+
+ private int runBalancer(int threshold) throws IOException {
+ return runBalancer(""+threshold);
+ }
+ /*
+ * TODO change the heap size balancer uses so it can run on gateways
+ * i.e., 14G heap is too big for gateways
+ */
+ private int runBalancer(String threshold)
+ throws IOException {
+
+ String balancerCommand = String.format("\"%s -k -t %s %s; %s %s -threshold %s",
CMD_KINIT,
KERB_KEYTAB,
KERB_PRINCIPAL,
CMD_HADOOP,
- OPT_BALANCER);
+ OPT_BALANCER,
+ threshold);
String nnHost = dfsCluster.getNNClient().getHostName();
- runAndWatch(nnHost, balancerCommand);
+ return runAndWatch(nnHost, balancerCommand);
+ }
+ private void runBalancerAndVerify(List<DNClient> testnodes)
+ throws IOException {
+ runBalancerAndVerify(testnodes, DEFAULT_THRESHOLD);
+ }
+ private void runBalancerAndVerify(List<DNClient> testnodes, int threshold)
+ throws IOException {
+ runBalancerAndVerify(testnodes, ""+DEFAULT_THRESHOLD);
+ }
+ private void runBalancerAndVerify(List<DNClient> testnodes, String threshold)
+ throws IOException {
+ int exitStatus = runBalancer(threshold);
+ // assert balancer exits with status SUCCESSe
+ Assert.assertTrue(
+ String.format("balancer returned non-success exit code: %d",
+ exitStatus),
+ (exitStatus == SUCCESS));
+ DNClient[] testnodeArr = toDatanodeArray(testnodes);
+ Assert.assertTrue(
+ "cluster is not balanced",
+ isClusterBalanced(testnodeArr));
}
- private void runAndWatch(String remoteHost, String remoteCommand) {
+ private int runAndWatch(String remoteHost, String remoteCommand) {
+ int exitStatus = -1;
try {
Process proc = new ProcessBuilder(CMD_SSH, remoteHost, remoteCommand).start();
watchProcStream(proc.getInputStream(), System.out);
watchProcStream(proc.getErrorStream(), System.err);
- int exitVal = proc.waitFor();
+ exitStatus = proc.waitFor();
} catch(InterruptedException intExc) {
LOG.warn("got thread interrupt error", intExc);
} catch(IOException ioExc) {
LOG.warn("got i/o error", ioExc);
}
+ return exitStatus;
}
private void watchProcStream(InputStream in, PrintStream out) {
new Thread(new StreamWatcher(in, out)).start();
}
private static final String DATANODE_VOLUME_INFO = "VolumeInfo";
- private static final String ATTRNAME_USED_SPACE="usedSpace";
- private long getDatanodeUsedSpace(DNClient datanode) throws IOException {
+ private static final String ATTRNAME_USED_SPACE = "usedSpace";
+ private static final String ATTRNAME_FREE_SPACE = "freeSpace";
+ // pseudo attribute, JMX doesn't really provide this
+ private static final String ATTRNAME_CAPACITY = "capacity";
+ // TODO maybe the static methods below belong in some utility class...
+ private static long getDatanodeUsedSpace(DNClient datanode)
+ throws IOException {
+ return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_USED_SPACE);
+ }/*
+ private static long getDatanodeFreeSpace(DNClient datanode)
+ throws IOException {
+ return (Long)getDatanodeVolumeAttributes(datanode).get(ATTRNAME_FREE_SPACE);
+ }*/
+ private static Map getDatanodeVolumeAttributes(DNClient datanode)
+ throws IOException {
+ Map result = new HashMap();
+ long usedSpace = getVolumeAttribute(datanode, ATTRNAME_USED_SPACE);
+ long freeSpace = getVolumeAttribute(datanode, ATTRNAME_FREE_SPACE);
+ result.put(ATTRNAME_USED_SPACE, usedSpace);
+ result.put(ATTRNAME_CAPACITY, usedSpace+freeSpace);
+ return result;
+ }
+
+ private static long getVolumeAttribute(DNClient datanode,
+ String attribName)
+ throws IOException {
+
Object volInfo = datanode.getDaemonAttribute(DATANODE_VOLUME_INFO);
Assert
.assertNotNull( String
@@ -432,15 +513,15 @@ public class TestBalancer {
DATANODE_VOLUME_INFO,
strVolInfo) );
Map volInfoMap = (Map) JSON.parse(strVolInfo);
- long totalUsedSpace = 0L;
+ long attrVal = 0L;
for(Object key: volInfoMap.keySet()) {
Map attrMap = (Map) volInfoMap.get(key);
- long usedSpace = (Long) attrMap.get(ATTRNAME_USED_SPACE);
- totalUsedSpace += usedSpace;
+ long val = (Long) attrMap.get(attribName);
+ attrVal += val;
}
- return totalUsedSpace;
- }
+ return attrVal;
+ }
/** simple utility to watch streams from an exec'ed process */
static class StreamWatcher implements Runnable {
@@ -483,25 +564,47 @@ public class TestBalancer {
}
}
- /**
+ // A constant for SUCCESS exit code
+ static final int SUCCESS = 1;
+
+ /**
* Balancer_01
* Start balancer and check if the cluster is balanced after the run.
* Cluster should end up in balanced state.
*/
@Test
public void testBalancerSimple() throws IOException {
- // run balancer on "normal"cluster cluster
- throw new UnsupportedOperationException("not implemented yet!");
+
+ DNClient[] datanodes = toDatanodeArray( getAllDatanodes() );
+ int exitStatus = runBalancer();
+ // assert on successful exit code here
+ Assert.assertTrue(
+ String.format("balancer returned non-success exit code: %d",
+ exitStatus),
+ (exitStatus == SUCCESS));
+ Assert.assertTrue( "cluster is not balanced", isClusterBalanced(datanodes) );
+
}
/**
* Balancer_02
* Test a cluster with even distribution, then a new empty node is
- * added to the cluster.
+ * added to the cluster. Here, even distribution effectively means the
+ * cluster is in "balanced" state, as bytes consumed for block allocation
+ * are evenly distributed throughout the cluster.
*/
@Test
public void testBalancerEvenDistributionWithNewNodeAdded() throws IOException {
throw new UnsupportedOperationException("not implemented yet!");
+
+ // get all nodes
+ // need to get an external reserve of nodes we can boot up
+ // to add to this cluster?
+ // HOW?
+
+ // IDEA try to steal some nodes from omega-M for now.....
+ // hmmm also need a way to give an alternate "empty-node" config
+ // to "hide" the data that may already exist on this node
}
/**
@@ -511,6 +614,8 @@ public class TestBalancer {
*/
@Test
public void testBalancerSingleNodeClusterWithNewNodeAdded() throws IOException {
+ // empty datanode: mod config to point to non-default blocks dir.
+ // limit capacity to available storage space
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -522,6 +627,7 @@ public class TestBalancer {
@Test
public void testBalancerSingleNodeClusterWithNewNodeAddedFromDifferentRack()
throws IOException {
+ // need rack awareness
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -532,6 +638,7 @@ public class TestBalancer {
*/
@Test
public void testBalancerSingleNodeClusterWithHalfCapacityNewNode() {
+ // how to limit node capacity?
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -542,6 +649,7 @@ public class TestBalancer {
*/
@Test
public void testBalancerTwoNodeMultiRackCluster() {
+ // need rack awareness
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -554,7 +662,24 @@ public class TestBalancer {
@Test
public void testBalancerTwoNodeSingleRackClusterWuthNewNodeAdded()
throws IOException {
- throw new UnsupportedOperationException("not implemented yet!");
+
+ final short TEST_REPLICATION_FACTOR = 3;
+ List<DNClient> testnodes = reserveDatanodesForTest(3);
+ DNClient dnA = testnodes.get(0);
+ DNClient dnB = testnodes.get(1);
+
+ DNClient dnC = testnodes.get(2);
+ stopDatanode(dnC);
+
+ // change test: 30% full-er (ie, 30% over pre-test capacity),
+ // use most heavily node as baseline
+ long targetLoad = (long) (
+ (1/DFS_BLOCK_SIZE) *
+ 0.30 *
+ Math.max( getDatanodeUsedSpace(dnA), getDatanodeUsedSpace(dnB) ) );
+ generateFileSystemLoad(targetLoad, TEST_REPLICATION_FACTOR);
+ startDatanode(dnC);
+ runBalancerAndVerify(testnodes);
}
/**
@@ -564,6 +689,7 @@ public class TestBalancer {
@Test
public void testBalancerTwoNodeMultiRackClusterWithNewNodeAdded()
throws IOException {
+ // need rack awareness
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -574,6 +700,7 @@ public class TestBalancer {
@Test
public void testBalancerTwoNodeSingleRackClusterInterruptingRebalance()
throws IOException {
+ // interrupt thread
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -584,6 +711,7 @@ public class TestBalancer {
@Test
public void testBalancerRestartInterruptedBalancerUntilDone()
throws IOException {
+ // need kill-restart thread
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -594,6 +722,7 @@ public class TestBalancer {
@Test
public void testBalancerTwoNodeSingleRackShutdownNameNodeDuringRebalance()
throws IOException {
+ // need NN shutdown thread in addition
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -605,6 +734,7 @@ public class TestBalancer {
public void
testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSWrites()
throws IOException {
+ // writer thread
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -615,6 +745,7 @@ public class TestBalancer {
@Test
public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletes()
throws IOException {
+ // eraser thread
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -626,6 +757,7 @@ public class TestBalancer {
@Test
public void testBalancerSingleNodeClusterWithHalfCapacityNewNodeRebalanceWithConcurrentFSDeletesAndWrites()
throws IOException {
+ // writer & eraser threads
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -639,6 +771,22 @@ public class TestBalancer {
*/
@Test
public void testBalancerScalability() throws IOException {
+ /* work in progress->
+ *
+ *
+ List<DNClient> dnList = getAllDatanodes();
+ int dnCount = dnList.size();
+
+ Assert.assertTrue(
+ String.format(
+ "not enough datanodes available to run test,"
+ + " need 2 datanodes but have only %d available",
+ dnCount),
+ ( dnCount == (875 - 2) ));
+
+ List<DNClient> datanodes = reserveDatanodesForTest(750);
+ shutdownNonTestNodes(datanodes);
+ */
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -649,7 +797,12 @@ public class TestBalancer {
@Test
public void testBalancerConfiguredWithThresholdValueNegative()
throws IOException {
- throw new UnsupportedOperationException("not implemented yet!");
+ List<DNClient> testnodes = getAllDatanodes();
+ final int TRIALS=5;
+ for(int i=0; i<TRIALS; i++) {
+ int negThreshold = (int)(-1 * 100 * Math.random());
+ runBalancerAndVerify(testnodes, negThreshold);
+ }
}
/**
@@ -660,7 +813,13 @@ public class TestBalancer {
@Test
public void testBalancerConfiguredWithThresholdValueOutOfRange()
throws IOException {
- throw new UnsupportedOperationException("not implemented yet!");
+ List<DNClient> testnodes = getAllDatanodes();
+ final int[] THRESHOLD_OUT_OF_RANGE_DATA = {
+ -123, 0, -324, 100000, -12222222, 1000000000, -10000, 345, 989
+ };
+ for(int threshold: THRESHOLD_OUT_OF_RANGE_DATA) {
+ runBalancerAndVerify(testnodes, threshold);
+ }
}
/**
@@ -671,7 +830,14 @@ public class TestBalancer {
@Test
public void testBalancerConfiguredWithThresholdValueAlphanumeric()
throws IOException {
- throw new UnsupportedOperationException("not implemented yet!");
+ List<DNClient> testnodes = getAllDatanodes();
+ final String[] THRESHOLD_ALPHA_DATA = {
+ "103dsf", "asd234", "asfd", "ASD", "#$asd", "2345&", "$35", "%34",
+ "0x64", "0xde", "0xad", "0xbe", "0xef"
+ };
+ for(String threshold: THRESHOLD_ALPHA_DATA) {
+ runBalancerAndVerify(testnodes,threshold);
+ }
}
/**
@@ -681,6 +847,7 @@ public class TestBalancer {
@Test
public void testBalancerRunTwoConcurrentInstancesOnSingleGateway()
throws IOException {
+ // do on gateway logic with small balancer heap
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -691,6 +858,7 @@ public class TestBalancer {
@Test
public void testBalancerRunTwoConcurrentInstancesOnDistinctGateways()
throws IOException {
+ // do on gateway logic with small balancer heap
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -700,7 +868,9 @@ public class TestBalancer {
*/
@Test
public void testBalancerOnBalancedCluster() throws IOException {
- throw new UnsupportedOperationException("not implemented yet!");
+ // run balancer twice
+ testBalancerSimple();
+ testBalancerSimple();
}
/**
@@ -709,9 +879,12 @@ public class TestBalancer {
*/
@Test
public void testBalancerWithOnlyHalfOfDataNodesRunning()
- throws IOException {
- throw new UnsupportedOperationException("not implemented yet!");
- }
+ throws IOException {
+ List<DNClient> datanodes = getAllDatanodes();
+ int testnodeCount = (int)Math.floor(datanodes.size() * 0.5);
+ List<DNClient> testnodes = reserveDatanodesForTest(testnodeCount);
+ runBalancerAndVerify(testnodes);
+ }
/**
* Balancer_23
@@ -721,6 +894,7 @@ public class TestBalancer {
@Test
public void testBalancerOnBusyClusterWithOnlyHalfOfDatanodesRunning()
throws IOException {
+ // load thread
throw new UnsupportedOperationException("not implemented yet!");
}
@@ -823,7 +997,40 @@ public class TestBalancer {
@Test
public void testNamenodeProtocolGetBlocksFromNonexistentDatanode()
throws IOException {
+ final short replication = 1;
+ Path balancerTempDir = null;
+ try {
+ // reserve 2 nodes for test
+ List<DNClient> testnodes = reserveDatanodesForTest(2);
+ shutdownNonTestNodes(testnodes);
+
+ DNClient testnode1 = testnodes.get(0);
+ DNClient testnode2 = testnodes.get(1);
+
+ // write some blocks with replication factor of 1
+ balancerTempDir = makeTempDir();
+ generateFileSystemLoad(20, replication);
+ // get block locations from NN
+ NNClient namenode = dfsCluster.getNNClient();
+ // TODO extend namenode to get block locations
+ //namenode.get
+
+ // shutdown 1 node
+ stopDatanode(testnode1);
+
+ // attempt to retrieve blocks from the dead node
+ // we should fail
+ } finally {
+ // cleanup
+ // finally block to run cleanup
+ LOG.info("clean off test data from DFS [rmr ~/balancer-temp]");
+ try {
+ deleteTempDir(balancerTempDir);
+ } catch (Exception e) {
+ LOG.warn("problem cleaning up temp dir", e);
+ }
+ }
}
}