You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2007/12/27 22:39:13 UTC
svn commit: r607126 [3/3] - in /lucene/hadoop/trunk/src:
java/org/apache/hadoop/dfs/FSNamesystem.java
test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java?rev=607126&r1=607125&r2=607126&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/NNThroughputBenchmark.java Thu Dec 27 13:39:12 2007
@@ -1,787 +1,787 @@
-package org.apache.hadoop.dfs;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.log4j.Level;
-
-/**
- * Main class for a series of name-node benchmarks.
- *
- * Each benchmark measures throughput and average execution time
- * of a specific name-node operation, e.g. file creation or block reports.
- *
- * The benchmark does not involve any other hadoop components
- * except for the name-node. Each operation is executed
- * by calling directly the respective name-node method.
- * The name-node here is real all other components are simulated.
- *
- * Command line arguments for the benchmark include:<br>
- * 1) total number of operations to be performed,<br>
- * 2) number of threads to run these operations,<br>
- * 3) followed by operation specific input parameters.
- *
- * Then the benchmark generates inputs for each thread so that the
- * input generation overhead does not effect the resulting statistics.
- * The number of operations performed by threads practically is the same.
- * Precisely, the difference between the number of operations
- * performed by any two threads does not exceed 1.
- *
- * Then the benchmark executes the specified number of operations using
- * the specified number of threads and outputs the resulting stats.
- */
-public class NNThroughputBenchmark {
- private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NNThroughputBenchmark");
- private static final int BLOCK_SIZE = 16;
-
- static Configuration config;
- static NameNode nameNode;
-
- NNThroughputBenchmark(Configuration conf) throws IOException {
- config = conf;
- // We do not need many handlers, since each thread simulates a handler
- // by calling name-node methods directly
- config.setInt("dfs.namenode.handler.count", 1);
- // Start the NameNode
- String[] args = new String[] {};
- nameNode = NameNode.createNameNode(args, config);
- }
-
- void close() throws IOException {
- nameNode.stop();
- }
-
- static void turnOffNameNodeLogging() {
- // change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
- ((Log4JLogger)NameNode.LOG).getLogger().setLevel(Level.ERROR);
- ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ERROR);
- ((Log4JLogger)NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
- ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
- }
-
- /**
- * Base class for collecting operation statistics.
- *
- * Overload this class in order to run statistics for a
- * specific name-node operation.
- */
- abstract class OperationStatsBase {
- protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
- protected static final String OP_ALL_NAME = "all";
- protected static final String OP_ALL_USAGE = "-op all <other ops options>";
-
- protected String baseDir;
- protected short replication;
- protected int numThreads = 0; // number of threads
- protected int numOpsRequired = 0; // number of operations requested
- protected int numOpsExecuted = 0; // number of operations executed
- protected long cumulativeTime = 0; // sum of times for each op
- protected long elapsedTime = 0; // time from start to finish
-
- /**
- * Operation name.
- */
- abstract String getOpName();
-
- /**
- * Parse command line arguments.
- *
- * @param args arguments
- * @throws IOException
- */
- abstract void parseArguments(String[] args) throws IOException;
-
- /**
- * Generate inputs for each daemon thread.
- *
- * @param opsPerThread number of inputs for each thread.
- * @throws IOException
- */
- abstract void generateInputs(int[] opsPerThread) throws IOException;
-
- /**
- * This corresponds to the arg1 argument of
- * {@link #executeOp(int, int, String)}, which can have different meanings
- * depending on the operation performed.
- *
- * @param daemonId
- * @return the argument
- */
- abstract String getExecutionArgument(int daemonId);
-
- /**
- * Execute name-node operation.
- *
- * @param daemonId id of the daemon calling this method.
- * @param inputIdx serial index of the operation called by the deamon.
- * @param arg1 operation specific argument.
- * @return time of the individual name-node call.
- * @throws IOException
- */
- abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
-
- OperationStatsBase() {
- baseDir = BASE_DIR_NAME + "/" + getOpName();
- replication = (short) config.getInt("dfs.replication", 3);
- numOpsRequired = 10;
- numThreads = 3;
- }
-
- void benchmark() throws IOException {
- List<StatsDaemon> daemons = new ArrayList<StatsDaemon>();
- long start = 0;
- try {
- numOpsExecuted = 0;
- cumulativeTime = 0;
- if(numThreads < 1)
- return;
- int tIdx = 0; // thread index < nrThreads
- int opsPerThread[] = new int[numThreads];
- for(int opsScheduled = 0; opsScheduled < numOpsRequired;
- opsScheduled += opsPerThread[tIdx++]) {
- // execute in a separate thread
- opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx);
- if(opsPerThread[tIdx] == 0)
- opsPerThread[tIdx] = 1;
- }
- // if numThreads > numOpsRequired then the remaining threads will do nothing
- for(; tIdx < numThreads; tIdx++)
- opsPerThread[tIdx] = 0;
- turnOffNameNodeLogging();
- generateInputs(opsPerThread);
- for(tIdx=0; tIdx < numThreads; tIdx++)
- daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
- start = System.currentTimeMillis();
- LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s).");
- for(StatsDaemon d : daemons)
- d.start();
- } finally {
- while(isInPorgress(daemons)) {
- // try {Thread.sleep(500);} catch (InterruptedException e) {}
- }
- elapsedTime = System.currentTimeMillis() - start;
- for(StatsDaemon d : daemons) {
- incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
- // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted);
- }
- }
- }
-
- private boolean isInPorgress(List<StatsDaemon> daemons) {
- for(StatsDaemon d : daemons)
- if(d.isInPorgress())
- return true;
- return false;
- }
-
- void cleanUp() throws IOException {
- nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
- nameNode.delete(getBaseDir());
- }
-
- int getNumOpsExecuted() {
- return numOpsExecuted;
- }
-
- long getCumulativeTime() {
- return cumulativeTime;
- }
-
- long getElapsedTime() {
- return elapsedTime;
- }
-
- long getAverageTime() {
- return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
- }
-
- double getOpsPerSecond() {
- return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime;
- }
-
- String getBaseDir() {
- return baseDir;
- }
-
- String getClientName(int idx) {
- return getOpName() + "-client-" + idx;
- }
-
- void incrementStats(int ops, long time) {
- numOpsExecuted += ops;
- cumulativeTime += time;
- }
-
- /**
- * Parse first 2 arguments, corresponding to the "-op" option.
- *
- * @param args
- * @return true if operation is all, which means that options not related
- * to this operation should be ignored, or false otherwise, meaning
- * that usage should be printed when an unrelated option is encountered.
- * @throws IOException
- */
- protected boolean verifyOpArgument(String[] args) {
- if(args.length < 2 || ! args[0].startsWith("-op"))
- printUsage();
- String type = args[1];
- if(OP_ALL_NAME.equals(type)) {
- type = getOpName();
- return true;
- }
- if(!getOpName().equals(type))
- printUsage();
- return false;
- }
-
- void printResults() {
- LOG.info("--- " + getOpName() + " stats ---");
- LOG.info("# operations: " + getNumOpsExecuted());
- LOG.info("Elapsed Time: " + getElapsedTime());
- LOG.info(" Ops per sec: " + getOpsPerSecond());
- LOG.info("Average Time: " + getAverageTime());
- }
- }
-
- /**
- * One of the threads that perform stats operations.
- */
- private static class StatsDaemon extends Thread {
- private int daemonId;
- private int opsPerThread;
- private String arg1; // argument passed to executeOp()
- private volatile int localNumOpsExecuted = 0;
- private volatile long localCumulativeTime = 0;
- private OperationStatsBase statsOp;
-
- StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
- this.daemonId = daemonId;
- this.opsPerThread = nrOps;
- this.statsOp = op;
- // this.clientName = statsOp.getClientName(daemonId);
- setName(toString());
- }
-
- public void run() {
- localNumOpsExecuted = 0;
- localCumulativeTime = 0;
- arg1 = statsOp.getExecutionArgument(daemonId);
- try {
- benchmarkOne();
- } catch(IOException ex) {
- LOG.error("StatsDaemon " + daemonId + " failed: \n"
- + StringUtils.stringifyException(ex));
- }
- }
-
- public String toString() {
- return "StatsDaemon-" + daemonId;
- }
-
- void benchmarkOne() throws IOException {
- for(int idx = 0; idx < opsPerThread; idx++) {
- long stat = statsOp.executeOp(daemonId, idx, arg1);
- localNumOpsExecuted++;
- localCumulativeTime += stat;
- }
- }
-
- boolean isInPorgress() {
- return localNumOpsExecuted < opsPerThread;
- }
- }
-
- /**
- * File name generator.
- *
- * Each directory contains not more than a fixed number (filesPerDir)
- * of files and directories.
- * When the number of files in one directory reaches the maximum,
- * the generator creates a new directory and proceeds generating files in it.
- * The generated namespace tree is balanced that is any path to a leaf
- * file is not less than the height of the tree minus one.
- */
- private static class FileGenerator {
- private static final int DEFAULT_FILES_PER_DIRECTORY = 32;
- // Average file name size is 16.5 bytes
- private static final String FILE_NAME_PREFFIX ="ThrouputBenchfile"; // 17 bytes
- private static final String DIR_NAME_PREFFIX = "ThrouputBenchDir"; // 16 bytes
- // private static final int NUM_CLIENTS = 100;
-
- private int[] pathIndecies = new int[20]; // this will support up to 32**20 = 2**100 = 10**30 files
- private String baseDir;
- private String currentDir;
- private int filesPerDirectory = DEFAULT_FILES_PER_DIRECTORY;
- private long fileCount;
-
- FileGenerator(String baseDir, int filesPerDir) {
- this.baseDir = baseDir;
- this.filesPerDirectory = filesPerDir;
- reset();
- }
-
- String getNextDirName() {
- int depth = 0;
- while(pathIndecies[depth] >= 0)
- depth++;
- int level;
- for(level = depth-1;
- level >= 0 && pathIndecies[level] == filesPerDirectory-1; level--)
- pathIndecies[level] = 0;
- if(level < 0)
- pathIndecies[depth] = 0;
- else
- pathIndecies[level]++;
- level = 0;
- String next = baseDir;
- while(pathIndecies[level] >= 0)
- next = next + "/" + DIR_NAME_PREFFIX + pathIndecies[level++];
- return next;
- }
-
- synchronized String getNextFileName() {
- long fNum = fileCount % filesPerDirectory;
- if(fNum == 0) {
- currentDir = getNextDirName();
- // System.out.println("currentDir: " + currentDir);
- }
- String fn = currentDir + "/" + FILE_NAME_PREFFIX + fileCount;
- // System.out.println("getNextFileName(): " + fn + " fileCount = " + fileCount);
- fileCount++;
- return fn;
- }
-
- private synchronized void reset() {
- Arrays.fill(pathIndecies, -1);
- fileCount = 0L;
- currentDir = "";
- }
- }
-
- /**
- * File creation statistics.
- *
- * Each thread creates the same (+ or -1) number of files.
- * File names are pre-generated during initialization.
- * The created files do not have blocks.
- */
- class CreateFileStats extends OperationStatsBase {
- // Operation types
- static final String OP_CREATE_NAME = "create";
- static final String OP_CREATE_USAGE =
- "-op create [-threads T] [-files N] [-filesPerDir P]";
-
- protected FileGenerator nameGenerator;
- protected String[][] fileNames;
-
- CreateFileStats(String[] args) {
- super();
- parseArguments(args);
- }
-
- String getOpName() {
- return OP_CREATE_NAME;
- }
-
- void parseArguments(String[] args) {
- boolean ignoreUnrelatedOptions = verifyOpArgument(args);
- int nrFilesPerDir = 4;
- for (int i = 2; i < args.length; i++) { // parse command line
- if(args[i].equals("-files")) {
- if(i+1 == args.length) printUsage();
- numOpsRequired = Integer.parseInt(args[++i]);
- } else if(args[i].equals("-threads")) {
- if(i+1 == args.length) printUsage();
- numThreads = Integer.parseInt(args[++i]);
- } else if(args[i].equals("-filesPerDir")) {
- if(i+1 == args.length) printUsage();
- nrFilesPerDir = Integer.parseInt(args[++i]);
- } else if(!ignoreUnrelatedOptions)
- printUsage();
- }
- nameGenerator = new FileGenerator(getBaseDir(), nrFilesPerDir);
- }
-
- void generateInputs(int[] opsPerThread) throws IOException {
- assert opsPerThread.length == numThreads : "Error opsPerThread.length";
- nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
- // int generatedFileIdx = 0;
- fileNames = new String[numThreads][];
- for(int idx=0; idx < numThreads; idx++) {
- int threadOps = opsPerThread[idx];
- fileNames[idx] = new String[threadOps];
- for(int jdx=0; jdx < threadOps; jdx++)
- fileNames[idx][jdx] = nameGenerator.getNextFileName();
- }
- }
-
- void dummyActionNoSynch(int daemonId, int fileIdx) {
- for(int i=0; i < 2000; i++)
- fileNames[daemonId][fileIdx].contains(""+i);
- }
-
- /**
- * returns client name
- */
- String getExecutionArgument(int daemonId) {
- return getClientName(daemonId);
- }
-
- /**
- * Do file create.
- */
- long executeOp(int daemonId, int inputIdx, String clientName)
- throws IOException {
- long start = System.currentTimeMillis();
- // dummyActionNoSynch(fileIdx);
- nameNode.create(fileNames[daemonId][inputIdx], clientName,
- true, replication, BLOCK_SIZE);
- long end = System.currentTimeMillis();
- return end-start;
- }
-
- void printResults() {
- LOG.info("--- " + getOpName() + " inputs ---");
- LOG.info("nrFiles = " + numOpsRequired);
- LOG.info("nrThreads = " + numThreads);
- LOG.info("nrFilesPerDir = " + nameGenerator.filesPerDirectory);
- super.printResults();
- }
- }
-
- /**
- * Open file statistics.
- *
- * Each thread creates the same (+ or -1) number of files.
- * File names are pre-generated during initialization.
- * The created files do not have blocks.
- */
- class OpenFileStats extends CreateFileStats {
- // Operation types
- static final String OP_OPEN_NAME = "open";
- static final String OP_OPEN_USAGE =
- "-op open [-threads T] [-files N] [-filesPerDir P]";
-
- OpenFileStats(String[] args) {
- super(args);
- }
-
- String getOpName() {
- return OP_OPEN_NAME;
- }
-
- void generateInputs(int[] opsPerThread) throws IOException {
- // create files using opsPerThread
- String[] createArgs = new String[] {
- "-op", "create",
- "-threads", String.valueOf(this.numThreads),
- "-files", String.valueOf(numOpsRequired),
- "-filesPerDir", String.valueOf(nameGenerator.filesPerDirectory)};
- CreateFileStats opCreate = new CreateFileStats(createArgs);
- opCreate.benchmark();
- nameNode.rename(opCreate.getBaseDir(), getBaseDir());
- // use the same files for open
- super.generateInputs(opsPerThread);
- }
-
- /**
- * Do file open.
- */
- long executeOp(int daemonId, int inputIdx, String ignore)
- throws IOException {
- long start = System.currentTimeMillis();
- nameNode.open(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
- long end = System.currentTimeMillis();
- return end-start;
- }
- }
-
- /**
- * Minimal datanode simulator.
- */
- private static class TinyDatanode implements Comparable<String> {
- private static final long DF_CAPACITY = 100*1024*1024;
- private static final long DF_USED = 0;
- DatanodeRegistration dnRegistration;
- Block[] blocks;
- int nrBlocks; // actual number of blocks
-
- /**
- * Get data-node in the form
- * <host name> : <port>
- * where port is a 6 digit integer.
- * This is necessary in order to provide lexocographic ordering.
- * Host names are all the same, the ordering goes by port numbers.
- */
- private static String getNodeName(int port) throws IOException {
- String machineName = DNS.getDefaultHost("default", "default");
- String sPort = String.valueOf(100000 + port);
- if(sPort.length() > 6)
- throw new IOException("Too many data-nodes.");
- return machineName + ":" + sPort;
- }
-
- TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
- dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
- this.blocks = new Block[blockCapacity];
- this.nrBlocks = 0;
- }
-
- void register() throws IOException {
- // get versions from the namenode
- NamespaceInfo nsInfo = nameNode.versionRequest();
- dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
- DataNode.setNewStorageID(dnRegistration);
- // get network location
- String networkLoc = NetworkTopology.DEFAULT_RACK;
- // register datanode
- dnRegistration = nameNode.register(dnRegistration, networkLoc);
- }
-
- void sendHeartbeat() throws IOException {
- // register datanode
- DatanodeCommand cmd = nameNode.sendHeartbeat(
- dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
- if(cmd != null)
- LOG.info("sendHeartbeat Name-node reply: " + cmd.getAction());
- }
-
- boolean addBlock(Block blk) {
- if(nrBlocks == blocks.length) {
- LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
- return false;
- }
- blocks[nrBlocks] = blk;
- nrBlocks++;
- return true;
- }
-
- void formBlockReport() {
- // fill remaining slots with blocks that do not exist
- for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
- blocks[idx] = new Block(blocks.length - idx, 0);
- }
-
- public int compareTo(String name) {
- return dnRegistration.getName().compareTo(name);
- }
- }
-
- /**
- * Block report statistics.
- *
- * Each thread here represents its own data-node.
- * Data-nodes send the same block report each time.
- * The block report may contain missing or non-existing blocks.
- */
- class BlockReportStats extends OperationStatsBase {
- static final String OP_BLOCK_REPORT_NAME = "blockReport";
- static final String OP_BLOCK_REPORT_USAGE =
- "-op blockReport [-datanodes T] [-reports R] [-blocksPerReport B] [-blocksPerFile F]";
-
- private int blocksPerReport;
- private int blocksPerFile;
- private TinyDatanode[] datanodes; // array of data-nodes sorted by name
-
- BlockReportStats(String[] args) {
- super();
- this.blocksPerReport = 100;
- this.blocksPerFile = 10;
- // set heartbeat interval to 3 min, so that expiration were 40 min
- config.setLong("dfs.heartbeat.interval", 3 * 60);
- parseArguments(args);
- // adjust replication to the number of data-nodes
- this.replication = (short)Math.min((int)replication, getNumDatanodes());
- }
-
- /**
- * Each thread pretends its a data-node here.
- */
- private int getNumDatanodes() {
- return numThreads;
- }
-
- String getOpName() {
- return OP_BLOCK_REPORT_NAME;
- }
-
- void parseArguments(String[] args) {
- boolean ignoreUnrelatedOptions = verifyOpArgument(args);
- for (int i = 2; i < args.length; i++) { // parse command line
- if(args[i].equals("-reports")) {
- if(i+1 == args.length) printUsage();
- numOpsRequired = Integer.parseInt(args[++i]);
- } else if(args[i].equals("-datanodes")) {
- if(i+1 == args.length) printUsage();
- numThreads = Integer.parseInt(args[++i]);
- } else if(args[i].equals("-blocksPerReport")) {
- if(i+1 == args.length) printUsage();
- blocksPerReport = Integer.parseInt(args[++i]);
- } else if(args[i].equals("-blocksPerFile")) {
- if(i+1 == args.length) printUsage();
- blocksPerFile = Integer.parseInt(args[++i]);
- } else if(!ignoreUnrelatedOptions)
- printUsage();
- }
- }
-
- void generateInputs(int[] ignore) throws IOException {
- int nrDatanodes = getNumDatanodes();
- int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes
- / replication);
- int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile);
- datanodes = new TinyDatanode[nrDatanodes];
- // create data-nodes
- String prevDNName = "";
- for(int idx=0; idx < nrDatanodes; idx++) {
- datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
- datanodes[idx].register();
- assert datanodes[idx].dnRegistration.getName().compareTo(prevDNName) > 0
- : "Data-nodes must be sorted lexicographically.";
- datanodes[idx].sendHeartbeat();
- prevDNName = datanodes[idx].dnRegistration.getName();
- }
- // create files
- FileGenerator nameGenerator;
- nameGenerator = new FileGenerator(getBaseDir(), 100);
- String clientName = getClientName(007);
- for(int idx=0; idx < nrFiles; idx++) {
- String fileName = nameGenerator.getNextFileName();
- nameNode.create(fileName, clientName, true, replication, BLOCK_SIZE);
- addBlocks(fileName, clientName);
- nameNode.complete(fileName, clientName);
- }
- // prepare block reports
- for(int idx=0; idx < nrDatanodes; idx++) {
- datanodes[idx].formBlockReport();
- }
- }
-
- private void addBlocks(String fileName, String clientName) throws IOException {
- for(int jdx = 0; jdx < blocksPerFile; jdx++) {
- LocatedBlock loc = nameNode.addBlock(fileName, clientName);
- for(DatanodeInfo dnInfo : loc.getLocations()) {
- int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
- datanodes[dnIdx].addBlock(loc.getBlock());
- nameNode.blockReceived(
- datanodes[dnIdx].dnRegistration,
- new Block[] {loc.getBlock()},
- new String[] {""});
- }
- }
- }
-
- /**
- * Does not require the argument
- */
- String getExecutionArgument(int daemonId) {
- return null;
- }
-
- long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
- assert daemonId < numThreads : "Wrong daemonId.";
- TinyDatanode dn = datanodes[daemonId];
- long start = System.currentTimeMillis();
- nameNode.blockReport(dn.dnRegistration, dn.blocks);
- long end = System.currentTimeMillis();
- return end-start;
- }
-
- /**
- * Defines data-node name since client are data-nodes in this case.
- */
- @Override
- String getClientName(int idx) {
- return getOpName() + "-client-" + idx;
- }
-
- void printResults() {
- String blockDistribution = "";
- String delim = "(";
- for(int idx=0; idx < getNumDatanodes(); idx++) {
- blockDistribution += delim + datanodes[idx].nrBlocks;
- delim = ", ";
- }
- blockDistribution += ")";
- LOG.info("--- " + getOpName() + " inputs ---");
- LOG.info("reports = " + numOpsRequired);
- LOG.info("datanodes = " + numThreads + " " + blockDistribution);
- LOG.info("blocksPerReport = " + blocksPerReport);
- LOG.info("blocksPerFile = " + blocksPerFile);
- super.printResults();
- }
- }
-
- static void printUsage() {
- System.err.println("Usage: NNThroughputBenchmark"
- + "\n\t" + OperationStatsBase.OP_ALL_USAGE
- + " | \n\t" + CreateFileStats.OP_CREATE_USAGE
- + " | \n\t" + OpenFileStats.OP_OPEN_USAGE
- + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
- );
- System.exit(-1);
- }
-
- /**
- * Main method of the benchmark.
- * @param args command line parameters
- */
- public static void runBenchmark(Configuration conf, String[] args) throws Exception {
- if(args.length < 2 || ! args[0].startsWith("-op"))
- printUsage();
-
- String type = args[1];
- boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);
-
- NNThroughputBenchmark bench = null;
- List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
- OperationStatsBase opStat = null;
- try {
- bench = new NNThroughputBenchmark(conf);
- if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
- opStat = bench.new CreateFileStats(args);
- ops.add(opStat);
- }
- if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
- opStat = bench.new OpenFileStats(args);
- ops.add(opStat);
- }
- if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
- opStat = bench.new BlockReportStats(args);
- ops.add(opStat);
- }
- if(ops.size() == 0)
- printUsage();
- // run each bencmark
- for(OperationStatsBase op : ops) {
- LOG.info("Starting benchmark: " + op.getOpName());
- op.benchmark();
- op.cleanUp();
- }
- // print statistics
- for(OperationStatsBase op : ops) {
- LOG.info("");
- op.printResults();
- }
- } catch(Exception e) {
- LOG.error(StringUtils.stringifyException(e));
- throw e;
- } finally {
- if(bench != null)
- bench.close();
- }
- }
-
- public static void main(String[] args) throws Exception {
- runBenchmark(new Configuration(), args);
- }
-}
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
+
+/**
+ * Main class for a series of name-node benchmarks.
+ *
+ * Each benchmark measures throughput and average execution time
+ * of a specific name-node operation, e.g. file creation or block reports.
+ *
+ * The benchmark does not involve any other hadoop components
+ * except for the name-node. Each operation is executed
+ * by calling directly the respective name-node method.
+ * The name-node here is real all other components are simulated.
+ *
+ * Command line arguments for the benchmark include:<br>
+ * 1) total number of operations to be performed,<br>
+ * 2) number of threads to run these operations,<br>
+ * 3) followed by operation specific input parameters.
+ *
+ * Then the benchmark generates inputs for each thread so that the
+ * input generation overhead does not effect the resulting statistics.
+ * The number of operations performed by threads practically is the same.
+ * Precisely, the difference between the number of operations
+ * performed by any two threads does not exceed 1.
+ *
+ * Then the benchmark executes the specified number of operations using
+ * the specified number of threads and outputs the resulting stats.
+ */
+public class NNThroughputBenchmark {
+ private static final Log LOG = LogFactory.getLog("org.apache.hadoop.dfs.NNThroughputBenchmark");
+ private static final int BLOCK_SIZE = 16;
+
+ static Configuration config;
+ static NameNode nameNode;
+
+ NNThroughputBenchmark(Configuration conf) throws IOException {
+ config = conf;
+ // We do not need many handlers, since each thread simulates a handler
+ // by calling name-node methods directly
+ config.setInt("dfs.namenode.handler.count", 1);
+ // Start the NameNode
+ String[] args = new String[] {};
+ nameNode = NameNode.createNameNode(args, config);
+ }
+
+ void close() throws IOException {
+ nameNode.stop();
+ }
+
+ static void turnOffNameNodeLogging() {
+ // change log level to ERROR: NameNode.LOG & NameNode.stateChangeLog
+ ((Log4JLogger)NameNode.LOG).getLogger().setLevel(Level.ERROR);
+ ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ERROR);
+ ((Log4JLogger)NetworkTopology.LOG).getLogger().setLevel(Level.ERROR);
+ ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ERROR);
+ }
+
+ /**
+ * Base class for collecting operation statistics.
+ *
+ * Overload this class in order to run statistics for a
+ * specific name-node operation.
+ */
+ abstract class OperationStatsBase {
+ protected static final String BASE_DIR_NAME = "/nnThroughputBenchmark";
+ protected static final String OP_ALL_NAME = "all";
+ protected static final String OP_ALL_USAGE = "-op all <other ops options>";
+
+ protected String baseDir;
+ protected short replication;
+ protected int numThreads = 0; // number of threads
+ protected int numOpsRequired = 0; // number of operations requested
+ protected int numOpsExecuted = 0; // number of operations executed
+ protected long cumulativeTime = 0; // sum of times for each op
+ protected long elapsedTime = 0; // time from start to finish
+
+ /**
+ * Operation name.
+ */
+ abstract String getOpName();
+
+ /**
+ * Parse command line arguments.
+ *
+ * @param args arguments
+ * @throws IOException
+ */
+ abstract void parseArguments(String[] args) throws IOException;
+
+ /**
+ * Generate inputs for each daemon thread.
+ *
+ * @param opsPerThread number of inputs for each thread.
+ * @throws IOException
+ */
+ abstract void generateInputs(int[] opsPerThread) throws IOException;
+
+ /**
+ * This corresponds to the arg1 argument of
+ * {@link #executeOp(int, int, String)}, which can have different meanings
+ * depending on the operation performed.
+ *
+ * @param daemonId
+ * @return the argument
+ */
+ abstract String getExecutionArgument(int daemonId);
+
+ /**
+ * Execute name-node operation.
+ *
+ * @param daemonId id of the daemon calling this method.
+ * @param inputIdx serial index of the operation called by the deamon.
+ * @param arg1 operation specific argument.
+ * @return time of the individual name-node call.
+ * @throws IOException
+ */
+ abstract long executeOp(int daemonId, int inputIdx, String arg1) throws IOException;
+
+ OperationStatsBase() {
+ baseDir = BASE_DIR_NAME + "/" + getOpName();
+ replication = (short) config.getInt("dfs.replication", 3);
+ numOpsRequired = 10;
+ numThreads = 3;
+ }
+
+ void benchmark() throws IOException {
+ List<StatsDaemon> daemons = new ArrayList<StatsDaemon>();
+ long start = 0;
+ try {
+ numOpsExecuted = 0;
+ cumulativeTime = 0;
+ if(numThreads < 1)
+ return;
+ int tIdx = 0; // thread index < nrThreads
+ int opsPerThread[] = new int[numThreads];
+ for(int opsScheduled = 0; opsScheduled < numOpsRequired;
+ opsScheduled += opsPerThread[tIdx++]) {
+ // execute in a separate thread
+ opsPerThread[tIdx] = (numOpsRequired-opsScheduled)/(numThreads-tIdx);
+ if(opsPerThread[tIdx] == 0)
+ opsPerThread[tIdx] = 1;
+ }
+ // if numThreads > numOpsRequired then the remaining threads will do nothing
+ for(; tIdx < numThreads; tIdx++)
+ opsPerThread[tIdx] = 0;
+ turnOffNameNodeLogging();
+ generateInputs(opsPerThread);
+ for(tIdx=0; tIdx < numThreads; tIdx++)
+ daemons.add(new StatsDaemon(tIdx, opsPerThread[tIdx], this));
+ start = System.currentTimeMillis();
+ LOG.info("Starting " + numOpsRequired + " " + getOpName() + "(s).");
+ for(StatsDaemon d : daemons)
+ d.start();
+ } finally {
+ while(isInPorgress(daemons)) {
+ // try {Thread.sleep(500);} catch (InterruptedException e) {}
+ }
+ elapsedTime = System.currentTimeMillis() - start;
+ for(StatsDaemon d : daemons) {
+ incrementStats(d.localNumOpsExecuted, d.localCumulativeTime);
+ // System.out.println(d.toString() + ": ops Exec = " + d.localNumOpsExecuted);
+ }
+ }
+ }
+
+ private boolean isInPorgress(List<StatsDaemon> daemons) {
+ for(StatsDaemon d : daemons)
+ if(d.isInPorgress())
+ return true;
+ return false;
+ }
+
+ void cleanUp() throws IOException {
+ nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+ nameNode.delete(getBaseDir());
+ }
+
+ int getNumOpsExecuted() {
+ return numOpsExecuted;
+ }
+
+ long getCumulativeTime() {
+ return cumulativeTime;
+ }
+
+ long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ long getAverageTime() {
+ return numOpsExecuted == 0 ? 0 : cumulativeTime / numOpsExecuted;
+ }
+
+ double getOpsPerSecond() {
+ return elapsedTime == 0 ? 0 : 1000*(double)numOpsExecuted / elapsedTime;
+ }
+
+ String getBaseDir() {
+ return baseDir;
+ }
+
+ String getClientName(int idx) {
+ return getOpName() + "-client-" + idx;
+ }
+
+ void incrementStats(int ops, long time) {
+ numOpsExecuted += ops;
+ cumulativeTime += time;
+ }
+
+ /**
+ * Parse first 2 arguments, corresponding to the "-op" option.
+ *
+ * @param args
+ * @return true if operation is all, which means that options not related
+ * to this operation should be ignored, or false otherwise, meaning
+ * that usage should be printed when an unrelated option is encountered.
+ * @throws IOException
+ */
+ protected boolean verifyOpArgument(String[] args) {
+ if(args.length < 2 || ! args[0].startsWith("-op"))
+ printUsage();
+ String type = args[1];
+ if(OP_ALL_NAME.equals(type)) {
+ type = getOpName();
+ return true;
+ }
+ if(!getOpName().equals(type))
+ printUsage();
+ return false;
+ }
+
+ void printResults() {
+ LOG.info("--- " + getOpName() + " stats ---");
+ LOG.info("# operations: " + getNumOpsExecuted());
+ LOG.info("Elapsed Time: " + getElapsedTime());
+ LOG.info(" Ops per sec: " + getOpsPerSecond());
+ LOG.info("Average Time: " + getAverageTime());
+ }
+ }
+
+ /**
+ * One of the threads that perform stats operations.
+ */
+ private static class StatsDaemon extends Thread {
+ private int daemonId;
+ private int opsPerThread;
+ private String arg1; // argument passed to executeOp()
+ private volatile int localNumOpsExecuted = 0;
+ private volatile long localCumulativeTime = 0;
+ private OperationStatsBase statsOp;
+
+ StatsDaemon(int daemonId, int nrOps, OperationStatsBase op) {
+ this.daemonId = daemonId;
+ this.opsPerThread = nrOps;
+ this.statsOp = op;
+ // this.clientName = statsOp.getClientName(daemonId);
+ setName(toString());
+ }
+
+ public void run() {
+ localNumOpsExecuted = 0;
+ localCumulativeTime = 0;
+ arg1 = statsOp.getExecutionArgument(daemonId);
+ try {
+ benchmarkOne();
+ } catch(IOException ex) {
+ LOG.error("StatsDaemon " + daemonId + " failed: \n"
+ + StringUtils.stringifyException(ex));
+ }
+ }
+
+ public String toString() {
+ return "StatsDaemon-" + daemonId;
+ }
+
+ void benchmarkOne() throws IOException {
+ for(int idx = 0; idx < opsPerThread; idx++) {
+ long stat = statsOp.executeOp(daemonId, idx, arg1);
+ localNumOpsExecuted++;
+ localCumulativeTime += stat;
+ }
+ }
+
+ boolean isInPorgress() {
+ return localNumOpsExecuted < opsPerThread;
+ }
+ }
+
+ /**
+ * File name generator.
+ *
+ * Each directory contains not more than a fixed number (filesPerDir)
+ * of files and directories.
+ * When the number of files in one directory reaches the maximum,
+ * the generator creates a new directory and proceeds generating files in it.
+ * The generated namespace tree is balanced that is any path to a leaf
+ * file is not less than the height of the tree minus one.
+ */
+ private static class FileGenerator {
+ private static final int DEFAULT_FILES_PER_DIRECTORY = 32;
+ // Average file name size is 16.5 bytes
+ private static final String FILE_NAME_PREFFIX ="ThrouputBenchfile"; // 17 bytes
+ private static final String DIR_NAME_PREFFIX = "ThrouputBenchDir"; // 16 bytes
+ // private static final int NUM_CLIENTS = 100;
+
+ private int[] pathIndecies = new int[20]; // this will support up to 32**20 = 2**100 = 10**30 files
+ private String baseDir;
+ private String currentDir;
+ private int filesPerDirectory = DEFAULT_FILES_PER_DIRECTORY;
+ private long fileCount;
+
+ FileGenerator(String baseDir, int filesPerDir) {
+ this.baseDir = baseDir;
+ this.filesPerDirectory = filesPerDir;
+ reset();
+ }
+
+ String getNextDirName() {
+ int depth = 0;
+ while(pathIndecies[depth] >= 0)
+ depth++;
+ int level;
+ for(level = depth-1;
+ level >= 0 && pathIndecies[level] == filesPerDirectory-1; level--)
+ pathIndecies[level] = 0;
+ if(level < 0)
+ pathIndecies[depth] = 0;
+ else
+ pathIndecies[level]++;
+ level = 0;
+ String next = baseDir;
+ while(pathIndecies[level] >= 0)
+ next = next + "/" + DIR_NAME_PREFFIX + pathIndecies[level++];
+ return next;
+ }
+
+ synchronized String getNextFileName() {
+ long fNum = fileCount % filesPerDirectory;
+ if(fNum == 0) {
+ currentDir = getNextDirName();
+ // System.out.println("currentDir: " + currentDir);
+ }
+ String fn = currentDir + "/" + FILE_NAME_PREFFIX + fileCount;
+ // System.out.println("getNextFileName(): " + fn + " fileCount = " + fileCount);
+ fileCount++;
+ return fn;
+ }
+
+ private synchronized void reset() {
+ Arrays.fill(pathIndecies, -1);
+ fileCount = 0L;
+ currentDir = "";
+ }
+ }
+
+ /**
+ * File creation statistics.
+ *
+ * Each thread creates the same (+ or -1) number of files.
+ * File names are pre-generated during initialization.
+ * The created files do not have blocks.
+ */
+ class CreateFileStats extends OperationStatsBase {
+ // Operation types
+ static final String OP_CREATE_NAME = "create";
+ static final String OP_CREATE_USAGE =
+ "-op create [-threads T] [-files N] [-filesPerDir P]";
+
+ protected FileGenerator nameGenerator;
+ protected String[][] fileNames;
+
+ CreateFileStats(String[] args) {
+ super();
+ parseArguments(args);
+ }
+
+ String getOpName() {
+ return OP_CREATE_NAME;
+ }
+
+ void parseArguments(String[] args) {
+ boolean ignoreUnrelatedOptions = verifyOpArgument(args);
+ int nrFilesPerDir = 4;
+ for (int i = 2; i < args.length; i++) { // parse command line
+ if(args[i].equals("-files")) {
+ if(i+1 == args.length) printUsage();
+ numOpsRequired = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-threads")) {
+ if(i+1 == args.length) printUsage();
+ numThreads = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-filesPerDir")) {
+ if(i+1 == args.length) printUsage();
+ nrFilesPerDir = Integer.parseInt(args[++i]);
+ } else if(!ignoreUnrelatedOptions)
+ printUsage();
+ }
+ nameGenerator = new FileGenerator(getBaseDir(), nrFilesPerDir);
+ }
+
+ void generateInputs(int[] opsPerThread) throws IOException {
+ assert opsPerThread.length == numThreads : "Error opsPerThread.length";
+ nameNode.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
+ // int generatedFileIdx = 0;
+ fileNames = new String[numThreads][];
+ for(int idx=0; idx < numThreads; idx++) {
+ int threadOps = opsPerThread[idx];
+ fileNames[idx] = new String[threadOps];
+ for(int jdx=0; jdx < threadOps; jdx++)
+ fileNames[idx][jdx] = nameGenerator.getNextFileName();
+ }
+ }
+
+ void dummyActionNoSynch(int daemonId, int fileIdx) {
+ for(int i=0; i < 2000; i++)
+ fileNames[daemonId][fileIdx].contains(""+i);
+ }
+
+ /**
+ * returns client name
+ */
+ String getExecutionArgument(int daemonId) {
+ return getClientName(daemonId);
+ }
+
+ /**
+ * Do file create.
+ */
+ long executeOp(int daemonId, int inputIdx, String clientName)
+ throws IOException {
+ long start = System.currentTimeMillis();
+ // dummyActionNoSynch(fileIdx);
+ nameNode.create(fileNames[daemonId][inputIdx], clientName,
+ true, replication, BLOCK_SIZE);
+ long end = System.currentTimeMillis();
+ return end-start;
+ }
+
+ void printResults() {
+ LOG.info("--- " + getOpName() + " inputs ---");
+ LOG.info("nrFiles = " + numOpsRequired);
+ LOG.info("nrThreads = " + numThreads);
+ LOG.info("nrFilesPerDir = " + nameGenerator.filesPerDirectory);
+ super.printResults();
+ }
+ }
+
+ /**
+ * Open file statistics.
+ *
+ * Each thread creates the same (+ or -1) number of files.
+ * File names are pre-generated during initialization.
+ * The created files do not have blocks.
+ */
+ class OpenFileStats extends CreateFileStats {
+ // Operation types
+ static final String OP_OPEN_NAME = "open";
+ static final String OP_OPEN_USAGE =
+ "-op open [-threads T] [-files N] [-filesPerDir P]";
+
+ OpenFileStats(String[] args) {
+ super(args);
+ }
+
+ String getOpName() {
+ return OP_OPEN_NAME;
+ }
+
+ void generateInputs(int[] opsPerThread) throws IOException {
+ // create files using opsPerThread
+ String[] createArgs = new String[] {
+ "-op", "create",
+ "-threads", String.valueOf(this.numThreads),
+ "-files", String.valueOf(numOpsRequired),
+ "-filesPerDir", String.valueOf(nameGenerator.filesPerDirectory)};
+ CreateFileStats opCreate = new CreateFileStats(createArgs);
+ opCreate.benchmark();
+ nameNode.rename(opCreate.getBaseDir(), getBaseDir());
+ // use the same files for open
+ super.generateInputs(opsPerThread);
+ }
+
+ /**
+ * Do file open.
+ */
+ long executeOp(int daemonId, int inputIdx, String ignore)
+ throws IOException {
+ long start = System.currentTimeMillis();
+ nameNode.open(fileNames[daemonId][inputIdx], 0L, BLOCK_SIZE);
+ long end = System.currentTimeMillis();
+ return end-start;
+ }
+ }
+
+ /**
+ * Minimal datanode simulator.
+ */
+ private static class TinyDatanode implements Comparable<String> {
+ private static final long DF_CAPACITY = 100*1024*1024;
+ private static final long DF_USED = 0;
+ DatanodeRegistration dnRegistration;
+ Block[] blocks;
+ int nrBlocks; // actual number of blocks
+
+ /**
+ * Get data-node in the form
+ * <host name> : <port>
+ * where port is a 6 digit integer.
+ * This is necessary in order to provide lexocographic ordering.
+ * Host names are all the same, the ordering goes by port numbers.
+ */
+ private static String getNodeName(int port) throws IOException {
+ String machineName = DNS.getDefaultHost("default", "default");
+ String sPort = String.valueOf(100000 + port);
+ if(sPort.length() > 6)
+ throw new IOException("Too many data-nodes.");
+ return machineName + ":" + sPort;
+ }
+
+ TinyDatanode(int dnIdx, int blockCapacity) throws IOException {
+ dnRegistration = new DatanodeRegistration(getNodeName(dnIdx));
+ this.blocks = new Block[blockCapacity];
+ this.nrBlocks = 0;
+ }
+
+ void register() throws IOException {
+ // get versions from the namenode
+ NamespaceInfo nsInfo = nameNode.versionRequest();
+ dnRegistration.setStorageInfo(new DataStorage(nsInfo, ""));
+ DataNode.setNewStorageID(dnRegistration);
+ // get network location
+ String networkLoc = NetworkTopology.DEFAULT_RACK;
+ // register datanode
+ dnRegistration = nameNode.register(dnRegistration, networkLoc);
+ }
+
+ void sendHeartbeat() throws IOException {
+ // register datanode
+ DatanodeCommand cmd = nameNode.sendHeartbeat(
+ dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
+ if(cmd != null)
+ LOG.info("sendHeartbeat Name-node reply: " + cmd.getAction());
+ }
+
+ boolean addBlock(Block blk) {
+ if(nrBlocks == blocks.length) {
+ LOG.debug("Cannot add block: datanode capacity = " + blocks.length);
+ return false;
+ }
+ blocks[nrBlocks] = blk;
+ nrBlocks++;
+ return true;
+ }
+
+ void formBlockReport() {
+ // fill remaining slots with blocks that do not exist
+ for(int idx = blocks.length-1; idx >= nrBlocks; idx--)
+ blocks[idx] = new Block(blocks.length - idx, 0);
+ }
+
+ public int compareTo(String name) {
+ return dnRegistration.getName().compareTo(name);
+ }
+ }
+
+ /**
+ * Block report statistics.
+ *
+ * Each thread here represents its own data-node.
+ * Data-nodes send the same block report each time.
+ * The block report may contain missing or non-existing blocks.
+ */
+ class BlockReportStats extends OperationStatsBase {
+ static final String OP_BLOCK_REPORT_NAME = "blockReport";
+ static final String OP_BLOCK_REPORT_USAGE =
+ "-op blockReport [-datanodes T] [-reports R] [-blocksPerReport B] [-blocksPerFile F]";
+
+ private int blocksPerReport;
+ private int blocksPerFile;
+ private TinyDatanode[] datanodes; // array of data-nodes sorted by name
+
+ BlockReportStats(String[] args) {
+ super();
+ this.blocksPerReport = 100;
+ this.blocksPerFile = 10;
+ // set heartbeat interval to 3 min, so that expiration were 40 min
+ config.setLong("dfs.heartbeat.interval", 3 * 60);
+ parseArguments(args);
+ // adjust replication to the number of data-nodes
+ this.replication = (short)Math.min((int)replication, getNumDatanodes());
+ }
+
+ /**
+ * Each thread pretends its a data-node here.
+ */
+ private int getNumDatanodes() {
+ return numThreads;
+ }
+
+ String getOpName() {
+ return OP_BLOCK_REPORT_NAME;
+ }
+
+ void parseArguments(String[] args) {
+ boolean ignoreUnrelatedOptions = verifyOpArgument(args);
+ for (int i = 2; i < args.length; i++) { // parse command line
+ if(args[i].equals("-reports")) {
+ if(i+1 == args.length) printUsage();
+ numOpsRequired = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-datanodes")) {
+ if(i+1 == args.length) printUsage();
+ numThreads = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-blocksPerReport")) {
+ if(i+1 == args.length) printUsage();
+ blocksPerReport = Integer.parseInt(args[++i]);
+ } else if(args[i].equals("-blocksPerFile")) {
+ if(i+1 == args.length) printUsage();
+ blocksPerFile = Integer.parseInt(args[++i]);
+ } else if(!ignoreUnrelatedOptions)
+ printUsage();
+ }
+ }
+
+ void generateInputs(int[] ignore) throws IOException {
+ int nrDatanodes = getNumDatanodes();
+ int nrBlocks = (int)Math.ceil((double)blocksPerReport * nrDatanodes
+ / replication);
+ int nrFiles = (int)Math.ceil((double)nrBlocks / blocksPerFile);
+ datanodes = new TinyDatanode[nrDatanodes];
+ // create data-nodes
+ String prevDNName = "";
+ for(int idx=0; idx < nrDatanodes; idx++) {
+ datanodes[idx] = new TinyDatanode(idx, blocksPerReport);
+ datanodes[idx].register();
+ assert datanodes[idx].dnRegistration.getName().compareTo(prevDNName) > 0
+ : "Data-nodes must be sorted lexicographically.";
+ datanodes[idx].sendHeartbeat();
+ prevDNName = datanodes[idx].dnRegistration.getName();
+ }
+ // create files
+ FileGenerator nameGenerator;
+ nameGenerator = new FileGenerator(getBaseDir(), 100);
+ String clientName = getClientName(007);
+ for(int idx=0; idx < nrFiles; idx++) {
+ String fileName = nameGenerator.getNextFileName();
+ nameNode.create(fileName, clientName, true, replication, BLOCK_SIZE);
+ addBlocks(fileName, clientName);
+ nameNode.complete(fileName, clientName);
+ }
+ // prepare block reports
+ for(int idx=0; idx < nrDatanodes; idx++) {
+ datanodes[idx].formBlockReport();
+ }
+ }
+
+ private void addBlocks(String fileName, String clientName) throws IOException {
+ for(int jdx = 0; jdx < blocksPerFile; jdx++) {
+ LocatedBlock loc = nameNode.addBlock(fileName, clientName);
+ for(DatanodeInfo dnInfo : loc.getLocations()) {
+ int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
+ datanodes[dnIdx].addBlock(loc.getBlock());
+ nameNode.blockReceived(
+ datanodes[dnIdx].dnRegistration,
+ new Block[] {loc.getBlock()},
+ new String[] {""});
+ }
+ }
+ }
+
+ /**
+ * Does not require the argument
+ */
+ String getExecutionArgument(int daemonId) {
+ return null;
+ }
+
+ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
+ assert daemonId < numThreads : "Wrong daemonId.";
+ TinyDatanode dn = datanodes[daemonId];
+ long start = System.currentTimeMillis();
+ nameNode.blockReport(dn.dnRegistration, dn.blocks);
+ long end = System.currentTimeMillis();
+ return end-start;
+ }
+
+ /**
+ * Defines data-node name since client are data-nodes in this case.
+ */
+ @Override
+ String getClientName(int idx) {
+ return getOpName() + "-client-" + idx;
+ }
+
+ void printResults() {
+ String blockDistribution = "";
+ String delim = "(";
+ for(int idx=0; idx < getNumDatanodes(); idx++) {
+ blockDistribution += delim + datanodes[idx].nrBlocks;
+ delim = ", ";
+ }
+ blockDistribution += ")";
+ LOG.info("--- " + getOpName() + " inputs ---");
+ LOG.info("reports = " + numOpsRequired);
+ LOG.info("datanodes = " + numThreads + " " + blockDistribution);
+ LOG.info("blocksPerReport = " + blocksPerReport);
+ LOG.info("blocksPerFile = " + blocksPerFile);
+ super.printResults();
+ }
+ }
+
+ static void printUsage() {
+ System.err.println("Usage: NNThroughputBenchmark"
+ + "\n\t" + OperationStatsBase.OP_ALL_USAGE
+ + " | \n\t" + CreateFileStats.OP_CREATE_USAGE
+ + " | \n\t" + OpenFileStats.OP_OPEN_USAGE
+ + " | \n\t" + BlockReportStats.OP_BLOCK_REPORT_USAGE
+ );
+ System.exit(-1);
+ }
+
+ /**
+ * Main method of the benchmark.
+ * @param args command line parameters
+ */
+ public static void runBenchmark(Configuration conf, String[] args) throws Exception {
+ if(args.length < 2 || ! args[0].startsWith("-op"))
+ printUsage();
+
+ String type = args[1];
+ boolean runAll = OperationStatsBase.OP_ALL_NAME.equals(type);
+
+ NNThroughputBenchmark bench = null;
+ List<OperationStatsBase> ops = new ArrayList<OperationStatsBase>();
+ OperationStatsBase opStat = null;
+ try {
+ bench = new NNThroughputBenchmark(conf);
+ if(runAll || CreateFileStats.OP_CREATE_NAME.equals(type)) {
+ opStat = bench.new CreateFileStats(args);
+ ops.add(opStat);
+ }
+ if(runAll || OpenFileStats.OP_OPEN_NAME.equals(type)) {
+ opStat = bench.new OpenFileStats(args);
+ ops.add(opStat);
+ }
+ if(runAll || BlockReportStats.OP_BLOCK_REPORT_NAME.equals(type)) {
+ opStat = bench.new BlockReportStats(args);
+ ops.add(opStat);
+ }
+ if(ops.size() == 0)
+ printUsage();
+ // run each bencmark
+ for(OperationStatsBase op : ops) {
+ LOG.info("Starting benchmark: " + op.getOpName());
+ op.benchmark();
+ op.cleanUp();
+ }
+ // print statistics
+ for(OperationStatsBase op : ops) {
+ LOG.info("");
+ op.printResults();
+ }
+ } catch(Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ if(bench != null)
+ bench.close();
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ runBenchmark(new Configuration(), args);
+ }
+}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java?rev=607126&r1=607125&r2=607126&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestNNThroughputBenchmark.java Thu Dec 27 13:39:12 2007
@@ -1,17 +1,17 @@
-package org.apache.hadoop.dfs;
-
-import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
-
-public class TestNNThroughputBenchmark extends TestCase {
-
- /**
- * This test runs all benchmarks defined in {@link NNThroughputBenchmark}.
- */
- public void testNNThroughput() throws Exception {
- Configuration conf = new Configuration();
- conf.set("fs.default.name", "localhost:"+Integer.toString(50017));
- NameNode.format(conf);
- NNThroughputBenchmark.runBenchmark(conf, new String[] {"-op", "all"});
- }
-}
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestNNThroughputBenchmark extends TestCase {
+
+ /**
+ * This test runs all benchmarks defined in {@link NNThroughputBenchmark}.
+ */
+ public void testNNThroughput() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "localhost:"+Integer.toString(50017));
+ NameNode.format(conf);
+ NNThroughputBenchmark.runBenchmark(conf, new String[] {"-op", "all"});
+ }
+}