You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2010/05/21 02:02:13 UTC
svn commit: r946832 [1/3] - in /hadoop/mapreduce/trunk: ./
src/test/mapred/org/apache/hadoop/fs/slive/
Author: shv
Date: Fri May 21 00:02:12 2010
New Revision: 946832
URL: http://svn.apache.org/viewvc?rev=946832&view=rev
Log:
MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708. Contributed by Joshua Harlow.
Added:
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataVerifier.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataWriter.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DeleteOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DummyInputFormat.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Formatter.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Helper.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ListOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/MkdirOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ObserveableOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Operation.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationData.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationFactory.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationOutput.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/OperationWeight.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/PathFinder.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Range.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReadOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RenameOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ReportWriter.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/RouletteSelector.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SleepOp.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveMapper.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveReducer.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/SliveTest.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/TestSlive.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Timer.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/WeightSelector.java (with props)
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Weights.java (with props)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=946832&r1=946831&r2=946832&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri May 21 00:02:12 2010
@@ -23,6 +23,9 @@ Trunk (unreleased changes)
MAPREDUCE-1539. authorization checks for inter-server protocol
(based on HADOOP-6600) (Boris Shkolnik via shv)
+ MAPREDUCE-1804. Stress-test tool for HDFS introduced in HDFS-708.
+ (Joshua Harlow via shv)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java Fri May 21 00:02:12 2010
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput;
+import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
+
+/**
+ * Operation which selects a random file and appends a random amount of bytes
+ * (selected from the configuration for append size) to that file if it exists.
+ *
+ * This operation will capture statistics on success for bytes written, time
+ * taken (milliseconds), and success count and on failure it will capture the
+ * number of failures and the time taken (milliseconds) to fail.
+ */
+class AppendOp extends Operation {
+
+ private static final Log LOG = LogFactory.getLog(AppendOp.class);
+
+ AppendOp(ConfigExtractor cfg, Random rnd) {
+ super(AppendOp.class.getSimpleName(), cfg, rnd);
+ }
+
+ /**
+ * Gets the file to append to
+ *
+ * @return Path
+ */
+ protected Path getAppendFile() {
+ Path fn = getFinder().getFile();
+ return fn;
+ }
+
+ @Override // Operation
+ List<OperationOutput> run(FileSystem fs) {
+ List<OperationOutput> out = super.run(fs);
+ OutputStream os = null;
+ try {
+ Path fn = getAppendFile();
+ // determine file status for file length requirement
+ // to know if should fill in partial bytes
+ Range<Long> appendSizeRange = getConfig().getAppendSize();
+ if (getConfig().shouldAppendUseBlockSize()) {
+ appendSizeRange = getConfig().getBlockSize();
+ }
+ long appendSize = Range.betweenPositive(getRandom(), appendSizeRange);
+ long timeTaken = 0, bytesAppended = 0;
+ DataWriter writer = new DataWriter(getRandom());
+ LOG.info("Attempting to append to file at " + fn + " of size "
+ + Helper.toByteInfo(appendSize));
+ {
+ // open
+ long startTime = Timer.now();
+ os = fs.append(fn);
+ timeTaken += Timer.elapsed(startTime);
+ // append given length
+ GenerateOutput stats = writer.writeSegment(appendSize, os);
+ timeTaken += stats.getTimeTaken();
+ bytesAppended += stats.getBytesWritten();
+ // capture close time
+ startTime = Timer.now();
+ os.close();
+ os = null;
+ timeTaken += Timer.elapsed(startTime);
+ }
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.BYTES_WRITTEN, bytesAppended));
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.OK_TIME_TAKEN, timeTaken));
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.SUCCESSES, 1L));
+ LOG.info("Appended " + Helper.toByteInfo(bytesAppended) + " to file "
+ + fn + " in " + timeTaken + " milliseconds");
+ } catch (FileNotFoundException e) {
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.NOT_FOUND, 1L));
+ LOG.warn("Error with appending", e);
+ } catch (IOException e) {
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.FAILURES, 1L));
+ LOG.warn("Error with appending", e);
+ } finally {
+ if (os != null) {
+ try {
+ os.close();
+ } catch (IOException e) {
+ LOG.warn("Error with closing append stream", e);
+ }
+ }
+ }
+ return out;
+ }
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/AppendOp.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java Fri May 21 00:02:12 2010
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.fs.slive.Constants.Distribution;
+import org.apache.hadoop.fs.slive.Constants.OperationType;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Class which abstracts the parsing of command line arguments for slive test
+ */
+class ArgumentParser {
+
+ private Options optList;
+ private String[] argumentList;
+ private ParsedOutput parsed;
+
+ /**
+ * Result of a parse is the following object
+ */
+ static class ParsedOutput {
+ private CommandLine parsedData;
+ private ArgumentParser source;
+ private boolean needHelp;
+
+ ParsedOutput(CommandLine parsedData, ArgumentParser source,
+ boolean needHelp) {
+ this.parsedData = parsedData;
+ this.source = source;
+ this.needHelp = needHelp;
+ }
+
+ /**
+ * @return whether the calling object should call output help and exit
+ */
+ boolean shouldOutputHelp() {
+ return needHelp;
+ }
+
+ /**
+ * Outputs the formatted help to standard out
+ */
+ void outputHelp() {
+ if (!shouldOutputHelp()) {
+ return;
+ }
+ if (source != null) {
+ HelpFormatter hlp = new HelpFormatter();
+ hlp.printHelp(Constants.PROG_NAME + " " + Constants.PROG_VERSION,
+ source.getOptionList());
+ }
+ }
+
+ /**
+ * @param optName
+ * the option name to get the value for
+ *
+ * @return the option value or null if it does not exist
+ */
+ String getValue(String optName) {
+ if (parsedData == null) {
+ return null;
+ }
+ return parsedData.getOptionValue(optName);
+ }
+
+ public String toString() {
+ StringBuilder s = new StringBuilder();
+ if (parsedData != null) {
+ Option[] ops = parsedData.getOptions();
+ for (int i = 0; i < ops.length; ++i) {
+ s.append(ops[i].getOpt() + " = " + s.append(ops[i].getValue()) + ",");
+ }
+ }
+ return s.toString();
+ }
+
+ }
+
+ ArgumentParser(String[] args) {
+ optList = getOptions();
+ if (args == null) {
+ args = new String[] {};
+ }
+ argumentList = args;
+ parsed = null;
+ }
+
+ private Options getOptionList() {
+ return optList;
+ }
+
+ /**
+ * Parses the command line options
+ *
+ * @return false if need to print help output
+ *
+ * @throws Exception
+ * when parsing fails
+ */
+ ParsedOutput parse() throws Exception {
+ if (parsed == null) {
+ PosixParser parser = new PosixParser();
+ CommandLine popts = parser.parse(getOptionList(), argumentList, true);
+ if (popts.hasOption(ConfigOption.HELP.getOpt())) {
+ parsed = new ParsedOutput(null, this, true);
+ } else {
+ parsed = new ParsedOutput(popts, this, false);
+ }
+ }
+ return parsed;
+ }
+
+ /**
+ * @return the option set to be used in command line parsing
+ */
+ private Options getOptions() {
+ Options cliopt = new Options();
+ cliopt.addOption(ConfigOption.MAPS);
+ cliopt.addOption(ConfigOption.PACKET_SIZE);
+ cliopt.addOption(ConfigOption.OPS);
+ cliopt.addOption(ConfigOption.DURATION);
+ cliopt.addOption(ConfigOption.EXIT_ON_ERROR);
+ cliopt.addOption(ConfigOption.SLEEP_TIME);
+ cliopt.addOption(ConfigOption.FILES);
+ cliopt.addOption(ConfigOption.DIR_SIZE);
+ cliopt.addOption(ConfigOption.BASE_DIR);
+ cliopt.addOption(ConfigOption.RESULT_FILE);
+ cliopt.addOption(ConfigOption.CLEANUP);
+ {
+ String distStrs[] = new String[Distribution.values().length];
+ Distribution distValues[] = Distribution.values();
+ for (int i = 0; i < distValues.length; ++i) {
+ distStrs[i] = distValues[i].lowerName();
+ }
+ String opdesc = String.format(Constants.OP_DESCR, StringUtils
+ .arrayToString(distStrs));
+ for (OperationType type : OperationType.values()) {
+ String opname = type.lowerName();
+ cliopt.addOption(new Option(opname, true, opdesc));
+ }
+ }
+ cliopt.addOption(ConfigOption.REPLICATION_AM);
+ cliopt.addOption(ConfigOption.BLOCK_SIZE);
+ cliopt.addOption(ConfigOption.READ_SIZE);
+ cliopt.addOption(ConfigOption.WRITE_SIZE);
+ cliopt.addOption(ConfigOption.APPEND_SIZE);
+ cliopt.addOption(ConfigOption.RANDOM_SEED);
+ cliopt.addOption(ConfigOption.QUEUE_NAME);
+ cliopt.addOption(ConfigOption.HELP);
+ return cliopt;
+ }
+
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ArgumentParser.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java Fri May 21 00:02:12 2010
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.IOException;
+
+/**
+ * Exception used to signify file reading failures where headers are bad or an
+ * unexpected EOF occurs when it should not.
+ */
+class BadFileException extends IOException {
+
+ private static final long serialVersionUID = 463201983951298129L;
+
+ BadFileException(String msg) {
+ super(msg);
+ }
+
+ BadFileException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/BadFileException.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java Fri May 21 00:02:12 2010
@@ -0,0 +1,731 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.text.NumberFormat;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.slive.Constants.OperationType;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Simple access layer onto of a configuration object that extracts the slive
+ * specific configuration values needed for slive running
+ */
+class ConfigExtractor {
+
+ private static final Log LOG = LogFactory.getLog(ConfigExtractor.class);
+
+ private Configuration config;
+
+ ConfigExtractor(Configuration cfg) {
+ this.config = cfg;
+ }
+
+ /**
+ * @return the wrapped configuration that this extractor will use
+ */
+ Configuration getConfig() {
+ return this.config;
+ }
+
+ /**
+ * @return the location of where data should be written to
+ */
+ Path getDataPath() {
+ Path base = getBaseDirectory();
+ if (base == null) {
+ return null;
+ }
+ return new Path(base, Constants.DATA_DIR);
+ }
+
+ /**
+ * @return the location of where the reducer should write its data to
+ */
+ Path getOutputPath() {
+ Path base = getBaseDirectory();
+ if (base == null) {
+ return null;
+ }
+ return new Path(base, Constants.OUTPUT_DIR);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ *
+ * @return the base directory where output & data should be stored using
+ * primary,config,default (in that order)
+ */
+ Path getBaseDirectory(String primary) {
+ String path = primary;
+ if (path == null) {
+ path = config.get(ConfigOption.BASE_DIR.getCfgOption());
+ }
+ if (path == null) {
+ path = ConfigOption.BASE_DIR.getDefault();
+ }
+ if (path == null) {
+ return null;
+ }
+ return new Path(path);
+ }
+
+ /**
+ * @return the base directory using only config and default values
+ */
+ Path getBaseDirectory() {
+ return getBaseDirectory(null);
+ }
+
+ /**
+ * @return whether the mapper or reducer should exit when they get there first
+ * error using only config and default values
+ */
+ boolean shouldExitOnFirstError() {
+ return shouldExitOnFirstError(null);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ *
+ * @return the boolean of whether the mapper/reducer should exit when they
+ * first error from primary,config,default (in that order)
+ */
+ boolean shouldExitOnFirstError(String primary) {
+ String val = primary;
+ if (val == null) {
+ val = config.get(ConfigOption.EXIT_ON_ERROR.getCfgOption());
+ }
+ if (val == null) {
+ return ConfigOption.EXIT_ON_ERROR.getDefault();
+ }
+ return Boolean.parseBoolean(val);
+ }
+
+ /**
+ * @return the number of reducers to use
+ */
+ Integer getReducerAmount() {
+ return 1;
+ }
+
+ /**
+ * @return the number of mappers to use using config and default values for
+ * lookup
+ */
+ Integer getMapAmount() {
+ return getMapAmount(null);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the reducer amount to use
+ */
+ Integer getMapAmount(String primary) {
+ return getInteger(primary, ConfigOption.MAPS);
+ }
+
+ /**
+ * @return the duration in seconds (or null or Integer.MAX for no limit) using
+ * the configuration and default as lookup
+ */
+ Integer getDuration() {
+ return getDuration(null);
+ }
+
+ /**
+ * @return the duration in milliseconds or null if no limit using config and
+ * default as lookup
+ */
+ Integer getDurationMilliseconds() {
+ Integer seconds = getDuration();
+ if (seconds == null || seconds == Integer.MAX_VALUE) {
+ return Integer.MAX_VALUE;
+ }
+ int milliseconds = (seconds * 1000);
+ if (milliseconds < 0) {
+ milliseconds = 0;
+ }
+ return milliseconds;
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the duration in seconds (or null or Integer.MAX for no limit)
+ */
+ Integer getDuration(String primary) {
+ return getInteger(primary, ConfigOption.DURATION);
+ }
+
+ /**
+ * @return the total number of operations to run using config and default as
+ * lookup
+ */
+ Integer getOpCount() {
+ return getOpCount(null);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the total number of operations to run
+ */
+ Integer getOpCount(String primary) {
+ return getInteger(primary, ConfigOption.OPS);
+ }
+
+ /**
+ * @return the total number of files per directory using config and default as
+ * lookup
+ */
+ Integer getDirSize() {
+ return getDirSize(null);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the total number of files per directory
+ */
+ Integer getDirSize(String primary) {
+ return getInteger(primary, ConfigOption.DIR_SIZE);
+ }
+
+ /**
+ * @param primary
+ * the primary string to attempt to convert into a integer
+ * @param opt
+ * the option to use as secondary + default if no primary given
+ * @return a parsed integer
+ */
+ private Integer getInteger(String primary, ConfigOption<Integer> opt) {
+ String value = primary;
+ if (value == null) {
+ value = config.get(opt.getCfgOption());
+ }
+ if (value == null) {
+ return opt.getDefault();
+ }
+ return Integer.parseInt(value);
+ }
+
+ /**
+ * @return the total number of files allowed using configuration and default
+ * for lookup
+ */
+ Integer getTotalFiles() {
+ return getTotalFiles(null);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the total number of files allowed
+ */
+ Integer getTotalFiles(String primary) {
+ return getInteger(primary, ConfigOption.FILES);
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the random seed start point or null if none
+ */
+ Long getRandomSeed(String primary) {
+ String seed = primary;
+ if (seed == null) {
+ seed = config.get(ConfigOption.RANDOM_SEED.getCfgOption());
+ }
+ if (seed == null) {
+ return null;
+ }
+ return Long.parseLong(seed);
+ }
+
+ /**
+ * @return the random seed start point or null if none using config and then
+ * default as lookup
+ */
+ Long getRandomSeed() {
+ return getRandomSeed(null);
+ }
+
+ /**
+ * @return the result file location or null if none using config and then
+ * default as lookup
+ */
+ String getResultFile() {
+ return getResultFile(null);
+ }
+
+ /**
+ * Gets the grid queue name to run on using config and default only
+ *
+ * @return String
+ */
+ String getQueueName() {
+ return getQueueName(null);
+ }
+
+ /**
+ * Gets the grid queue name to run on using the primary string or config or
+ * default
+ *
+ * @param primary
+ *
+ * @return String
+ */
+ String getQueueName(String primary) {
+ String q = primary;
+ if (q == null) {
+ q = config.get(ConfigOption.QUEUE_NAME.getCfgOption());
+ }
+ if (q == null) {
+ q = ConfigOption.QUEUE_NAME.getDefault();
+ }
+ return q;
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the result file location
+ */
+ String getResultFile(String primary) {
+ String fn = primary;
+ if (fn == null) {
+ fn = config.get(ConfigOption.RESULT_FILE.getCfgOption());
+ }
+ if (fn == null) {
+ fn = ConfigOption.RESULT_FILE.getDefault();
+ }
+ return fn;
+ }
+
+ /**
+ * @param primary
+ * primary the initial string to be used for the value of this
+ * configuration option (if not provided then config and then the
+ * default are used)
+ * @return the integer range allowed for the block size
+ */
+ Range<Long> getBlockSize(String primary) {
+ return getMinMaxBytes(ConfigOption.BLOCK_SIZE, primary);
+ }
+
+ /**
+ * @return the integer range allowed for the block size using config and
+ * default for lookup
+ */
+ Range<Long> getBlockSize() {
+ return getBlockSize(null);
+ }
+
+ /**
+ * @param cfgopt
+ * the configuration option to use for config and default lookup
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the parsed short range from primary, config, default
+ */
+ private Range<Short> getMinMaxShort(ConfigOption<Short> cfgopt, String primary) {
+ String sval = primary;
+ if (sval == null) {
+ sval = config.get(cfgopt.getCfgOption());
+ }
+ Range<Short> range = null;
+ if (sval != null) {
+ String pieces[] = Helper.getTrimmedStrings(sval);
+ if (pieces.length == 2) {
+ String min = pieces[0];
+ String max = pieces[1];
+ short minVal = Short.parseShort(min);
+ short maxVal = Short.parseShort(max);
+ if (minVal > maxVal) {
+ short tmp = minVal;
+ minVal = maxVal;
+ maxVal = tmp;
+ }
+ range = new Range<Short>(minVal, maxVal);
+ }
+ }
+ if (range == null) {
+ Short def = cfgopt.getDefault();
+ if (def != null) {
+ range = new Range<Short>(def, def);
+ }
+ }
+ return range;
+ }
+
+ /**
+ * @param cfgopt
+ * the configuration option to use for config and default lookup
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the parsed long range from primary, config, default
+ */
+ private Range<Long> getMinMaxLong(ConfigOption<Long> cfgopt, String primary) {
+ String sval = primary;
+ if (sval == null) {
+ sval = config.get(cfgopt.getCfgOption());
+ }
+ Range<Long> range = null;
+ if (sval != null) {
+ String pieces[] = Helper.getTrimmedStrings(sval);
+ if (pieces.length == 2) {
+ String min = pieces[0];
+ String max = pieces[1];
+ long minVal = Long.parseLong(min);
+ long maxVal = Long.parseLong(max);
+ if (minVal > maxVal) {
+ long tmp = minVal;
+ minVal = maxVal;
+ maxVal = tmp;
+ }
+ range = new Range<Long>(minVal, maxVal);
+ }
+ }
+ if (range == null) {
+ Long def = cfgopt.getDefault();
+ if (def != null) {
+ range = new Range<Long>(def, def);
+ }
+ }
+ return range;
+ }
+
+ /**
+ * @param cfgopt
+ * the configuration option to use for config and default lookup
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the parsed integer byte range from primary, config, default
+ */
+ private Range<Long> getMinMaxBytes(ConfigOption<Long> cfgopt, String primary) {
+ String sval = primary;
+ if (sval == null) {
+ sval = config.get(cfgopt.getCfgOption());
+ }
+ Range<Long> range = null;
+ if (sval != null) {
+ String pieces[] = Helper.getTrimmedStrings(sval);
+ if (pieces.length == 2) {
+ String min = pieces[0];
+ String max = pieces[1];
+ long tMin = StringUtils.TraditionalBinaryPrefix.string2long(min);
+ long tMax = StringUtils.TraditionalBinaryPrefix.string2long(max);
+ if (tMin > tMax) {
+ long tmp = tMin;
+ tMin = tMax;
+ tMax = tmp;
+ }
+ range = new Range<Long>(tMin, tMax);
+ }
+ }
+ if (range == null) {
+ Long def = cfgopt.getDefault();
+ if (def != null) {
+ range = new Range<Long>(def, def);
+ }
+ }
+ return range;
+ }
+
+ /**
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the replication range
+ */
+ Range<Short> getReplication(String primary) {
+ return getMinMaxShort(ConfigOption.REPLICATION_AM, primary);
+ }
+
+ /**
+ * @return the replication range using config and default for lookup
+ */
+ Range<Short> getReplication() {
+ return getReplication(null);
+ }
+
+ /**
+ * @return the map of operations to perform using config (percent may be null
+ * if unspecified)
+ */
+ Map<OperationType, OperationData> getOperations() {
+ Map<OperationType, OperationData> operations = new HashMap<OperationType, OperationData>();
+ for (OperationType type : OperationType.values()) {
+ String opname = type.lowerName();
+ String keyname = String.format(Constants.OP, opname);
+ String kval = config.get(keyname);
+ if (kval == null) {
+ continue;
+ }
+ operations.put(type, new OperationData(kval));
+ }
+ return operations;
+ }
+
+ /**
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the append byte size range (or null if none)
+ */
+ Range<Long> getAppendSize(String primary) {
+ return getMinMaxBytes(ConfigOption.APPEND_SIZE, primary);
+ }
+
+ /**
+ * @return the append byte size range (or null if none) using config and
+ * default for lookup
+ */
+ Range<Long> getAppendSize() {
+ return getAppendSize(null);
+ }
+
+ /**
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the sleep range (or null if none)
+ */
+ Range<Long> getSleepRange(String primary) {
+ return getMinMaxLong(ConfigOption.SLEEP_TIME, primary);
+ }
+
+ /**
+ * @return the sleep range (or null if none) using config and default for
+ * lookup
+ */
+ Range<Long> getSleepRange() {
+ return getSleepRange(null);
+ }
+
+ /**
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the write byte size range (or null if none)
+ */
+ Range<Long> getWriteSize(String primary) {
+ return getMinMaxBytes(ConfigOption.WRITE_SIZE, primary);
+ }
+
+ /**
+ * @return the write byte size range (or null if none) using config and
+ * default for lookup
+ */
+ Range<Long> getWriteSize() {
+ return getWriteSize(null);
+ }
+
+ /**
+ * Returns whether the write range should use the block size range
+ *
+ * @return true|false
+ */
+ boolean shouldWriteUseBlockSize() {
+ Range<Long> writeRange = getWriteSize();
+ if (writeRange == null
+ || (writeRange.getLower() == writeRange.getUpper() && (writeRange
+ .getUpper() == Long.MAX_VALUE))) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns whether the append range should use the block size range
+ *
+ * @return true|false
+ */
+ boolean shouldAppendUseBlockSize() {
+ Range<Long> appendRange = getAppendSize();
+ if (appendRange == null
+ || (appendRange.getLower() == appendRange.getUpper() && (appendRange
+ .getUpper() == Long.MAX_VALUE))) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns whether the read range should use the entire file
+ *
+ * @return true|false
+ */
+ boolean shouldReadFullFile() {
+ Range<Long> readRange = getReadSize();
+ if (readRange == null
+ || (readRange.getLower() == readRange.getUpper() && (readRange
+ .getUpper() == Long.MAX_VALUE))) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @param primary
+ * the initial string to be used for the value of this configuration
+ * option (if not provided then config and then the default are used)
+ * @return the read byte size range (or null if none)
+ */
+ Range<Long> getReadSize(String primary) {
+ return getMinMaxBytes(ConfigOption.READ_SIZE, primary);
+ }
+
+ /**
+ * Gets the bytes per checksum (if it exists or null if not)
+ *
+ * @return Long
+ */
+ Long getByteCheckSum() {
+ String val = config.get(Constants.BYTES_PER_CHECKSUM);
+ if(val == null) {
+ return null;
+ }
+ return Long.parseLong(val);
+ }
+
+ /**
+ * @return the read byte size range (or null if none) using config and default
+ * for lookup
+ */
+ Range<Long> getReadSize() {
+ return getReadSize(null);
+ }
+
+ /**
+ * Dumps out the given options for the given config extractor
+ *
+ * @param cfg
+ * the config to write to the log
+ */
+ static void dumpOptions(ConfigExtractor cfg) {
+ if (cfg == null) {
+ return;
+ }
+ LOG.info("Base directory = " + cfg.getBaseDirectory());
+ LOG.info("Data directory = " + cfg.getDataPath());
+ LOG.info("Output directory = " + cfg.getOutputPath());
+ LOG.info("Result file = " + cfg.getResultFile());
+ LOG.info("Grid queue = " + cfg.getQueueName());
+ LOG.info("Should exit on first error = " + cfg.shouldExitOnFirstError());
+ {
+ String duration = "Duration = ";
+ if (cfg.getDurationMilliseconds() == Integer.MAX_VALUE) {
+ duration += "unlimited";
+ } else {
+ duration += cfg.getDurationMilliseconds() + " milliseconds";
+ }
+ LOG.info(duration);
+ }
+ LOG.info("Map amount = " + cfg.getMapAmount());
+ LOG.info("Operation amount = " + cfg.getOpCount());
+ LOG.info("Total file limit = " + cfg.getTotalFiles());
+ LOG.info("Total dir file limit = " + cfg.getDirSize());
+ {
+ String read = "Read size = ";
+ if (cfg.shouldReadFullFile()) {
+ read += "entire file";
+ } else {
+ read += cfg.getReadSize() + " bytes";
+ }
+ LOG.info(read);
+ }
+ {
+ String write = "Write size = ";
+ if (cfg.shouldWriteUseBlockSize()) {
+ write += "blocksize";
+ } else {
+ write += cfg.getWriteSize() + " bytes";
+ }
+ LOG.info(write);
+ }
+ {
+ String append = "Append size = ";
+ if (cfg.shouldAppendUseBlockSize()) {
+ append += "blocksize";
+ } else {
+ append += cfg.getAppendSize() + " bytes";
+ }
+ LOG.info(append);
+ }
+ {
+ String bsize = "Block size = ";
+ bsize += cfg.getBlockSize() + " bytes";
+ LOG.info(bsize);
+ }
+ if (cfg.getRandomSeed() != null) {
+ LOG.info("Random seed = " + cfg.getRandomSeed());
+ }
+ if (cfg.getSleepRange() != null) {
+ LOG.info("Sleep range = " + cfg.getSleepRange() + " milliseconds");
+ }
+ LOG.info("Replication amount = " + cfg.getReplication());
+ LOG.info("Operations are:");
+ NumberFormat percFormatter = Formatter.getPercentFormatter();
+ Map<OperationType, OperationData> operations = cfg.getOperations();
+ for (OperationType type : operations.keySet()) {
+ String name = type.name();
+ LOG.info(name);
+ OperationData opInfo = operations.get(type);
+ LOG.info(" " + opInfo.getDistribution().name());
+ if (opInfo.getPercent() != null) {
+ LOG.info(" " + percFormatter.format(opInfo.getPercent()));
+ } else {
+ LOG.info(" ???");
+ }
+ }
+ }
+
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigExtractor.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java Fri May 21 00:02:12 2010
@@ -0,0 +1,554 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.slive.ArgumentParser.ParsedOutput;
+import org.apache.hadoop.fs.slive.Constants.Distribution;
+import org.apache.hadoop.fs.slive.Constants.OperationType;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Class which merges options given from a config file and the command line and
+ * performs some basic verification of the data retrieved and sets the verified
+ * values back into the configuration object for return
+ */
+class ConfigMerger {
+ /**
+ * Exception that represents config problems...
+ */
+ static class ConfigException extends IOException {
+
+ private static final long serialVersionUID = 2047129184917444550L;
+
+ ConfigException(String msg) {
+ super(msg);
+ }
+
+ ConfigException(String msg, Throwable e) {
+ super(msg, e);
+ }
+ }
+
+ /**
+ * Merges the given command line parsed output with the given configuration
+ * object and returns the new configuration object with the correct options
+ * overwritten
+ *
+ * @param opts
+ * the parsed command line option output
+ * @param base
+ * the base configuration to merge with
+ * @return merged configuration object
+ * @throws ConfigException
+ * when configuration errors or verification occur
+ */
+ Configuration getMerged(ParsedOutput opts, Configuration base)
+ throws ConfigException {
+ return handleOptions(opts, base);
+ }
+
+ /**
+ * Gets the base set of operations to use
+ *
+ * @return Map
+ */
+ private Map<OperationType, OperationData> getBaseOperations() {
+ Map<OperationType, OperationData> base = new HashMap<OperationType, OperationData>();
+ // add in all the operations
+ // since they will all be applied unless changed
+ OperationType[] types = OperationType.values();
+ for (OperationType type : types) {
+ base.put(type, new OperationData(Distribution.UNIFORM, null));
+ }
+ return base;
+ }
+
+ /**
+ * Handles the specific task of merging operations from the command line or
+ * extractor object into the base configuration provided
+ *
+ * @param opts
+ * the parsed command line option output
+ * @param base
+ * the base configuration to merge with
+ * @param extractor
+ * the access object to fetch operations from if none from the
+ * command line
+ * @return merged configuration object
+ * @throws ConfigException
+ * when verification fails
+ */
+ private Configuration handleOperations(ParsedOutput opts, Configuration base,
+ ConfigExtractor extractor) throws ConfigException {
+ // get the base set to start off with
+ Map<OperationType, OperationData> operations = getBaseOperations();
+ // merge with what is coming from config
+ Map<OperationType, OperationData> cfgOperations = extractor.getOperations();
+ for (OperationType opType : cfgOperations.keySet()) {
+ operations.put(opType, cfgOperations.get(opType));
+ }
+ // see if any coming in from the command line
+ for (OperationType opType : OperationType.values()) {
+ String opName = opType.lowerName();
+ String opVal = opts.getValue(opName);
+ if (opVal != null) {
+ operations.put(opType, new OperationData(opVal));
+ }
+ }
+ // remove those with <= zero percent
+ {
+ Map<OperationType, OperationData> cleanedOps = new HashMap<OperationType, OperationData>();
+ for (OperationType opType : operations.keySet()) {
+ OperationData data = operations.get(opType);
+ if (data.getPercent() == null || data.getPercent() > 0.0d) {
+ cleanedOps.put(opType, data);
+ }
+ }
+ operations = cleanedOps;
+ }
+ if (operations.isEmpty()) {
+ throw new ConfigException("No operations provided!");
+ }
+ // verify and adjust
+ double currPct = 0;
+ int needFill = 0;
+ for (OperationType type : operations.keySet()) {
+ OperationData op = operations.get(type);
+ if (op.getPercent() != null) {
+ currPct += op.getPercent();
+ } else {
+ needFill++;
+ }
+ }
+ if (currPct > 1) {
+ throw new ConfigException(
+ "Unable to have accumlative percent greater than 100%");
+ }
+ if (needFill > 0 && currPct < 1) {
+ double leftOver = 1.0 - currPct;
+ Map<OperationType, OperationData> mpcp = new HashMap<OperationType, OperationData>();
+ for (OperationType type : operations.keySet()) {
+ OperationData op = operations.get(type);
+ if (op.getPercent() == null) {
+ op = new OperationData(op.getDistribution(), (leftOver / needFill));
+ }
+ mpcp.put(type, op);
+ }
+ operations = mpcp;
+ } else if (needFill == 0 && currPct < 1) {
+ // redistribute
+ double leftOver = 1.0 - currPct;
+ Map<OperationType, OperationData> mpcp = new HashMap<OperationType, OperationData>();
+ double each = leftOver / operations.keySet().size();
+ for (OperationType t : operations.keySet()) {
+ OperationData op = operations.get(t);
+ op = new OperationData(op.getDistribution(), (op.getPercent() + each));
+ mpcp.put(t, op);
+ }
+ operations = mpcp;
+ } else if (needFill > 0 && currPct >= 1) {
+ throw new ConfigException(needFill
+ + " unfilled operations but no percentage left to fill with");
+ }
+ // save into base
+ for (OperationType opType : operations.keySet()) {
+ String opName = opType.lowerName();
+ OperationData opData = operations.get(opType);
+ String distr = opData.getDistribution().lowerName();
+ String ratio = new Double(opData.getPercent() * 100.0d).toString();
+ base.set(String.format(Constants.OP, opName), opData.toString());
+ base.set(String.format(Constants.OP_DISTR, opName), distr);
+ base.set(String.format(Constants.OP_PERCENT, opName), ratio);
+ }
+ return base;
+ }
+
+ /**
+ * Handles merging all options and verifying from the given command line
+ * output and the given base configuration and returns the merged
+ * configuration
+ *
+ * @param opts
+ * the parsed command line option output
+ * @param base
+ * the base configuration to merge with
+ * @return the merged configuration
+ * @throws ConfigException
+ */
+ private Configuration handleOptions(ParsedOutput opts, Configuration base)
+ throws ConfigException {
+ // ensure variables are overwritten and verified
+ ConfigExtractor extractor = new ConfigExtractor(base);
+ // overwrite the map amount and check to ensure > 0
+ {
+ Integer mapAmount = null;
+ try {
+ mapAmount = extractor.getMapAmount(opts.getValue(ConfigOption.MAPS
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging map amount", e);
+ }
+ if (mapAmount != null) {
+ if (mapAmount <= 0) {
+ throw new ConfigException(
+ "Map amount can not be less than or equal to zero");
+ }
+ base.set(ConfigOption.MAPS.getCfgOption(), mapAmount.toString());
+ }
+ }
+ // overwrite the duration amount and ensure > 0
+ {
+ Integer duration = null;
+ try {
+ duration = extractor.getDuration(opts.getValue(ConfigOption.DURATION
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging duration", e);
+ }
+ if (duration != null) {
+ if (duration <= 0) {
+ throw new ConfigException(
+ "Duration can not be less than or equal to zero");
+ }
+ base.set(ConfigOption.DURATION.getCfgOption(), duration.toString());
+ }
+ }
+ // overwrite the operation amount and ensure > 0
+ {
+ Integer operationAmount = null;
+ try {
+ operationAmount = extractor.getOpCount(opts.getValue(ConfigOption.OPS
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging operation amount", e);
+ }
+ if (operationAmount != null) {
+ if (operationAmount <= 0) {
+ throw new ConfigException(
+ "Operation amount can not be less than or equal to zero");
+ }
+ base.set(ConfigOption.OPS.getCfgOption(), operationAmount.toString());
+ }
+ }
+ // overwrite the exit on error setting
+ {
+ try {
+ boolean exitOnError = extractor.shouldExitOnFirstError(opts
+ .getValue(ConfigOption.EXIT_ON_ERROR.getOpt()));
+ base.setBoolean(ConfigOption.EXIT_ON_ERROR.getCfgOption(), exitOnError);
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging exit on error value", e);
+ }
+ }
+ // verify and set file limit and ensure > 0
+ {
+ Integer fileAm = null;
+ try {
+ fileAm = extractor.getTotalFiles(opts.getValue(ConfigOption.FILES
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging total file limit amount", e);
+ }
+ if (fileAm != null) {
+ if (fileAm <= 0) {
+ throw new ConfigException(
+ "File amount can not be less than or equal to zero");
+ }
+ base.set(ConfigOption.FILES.getCfgOption(), fileAm.toString());
+ }
+ }
+ // set the grid queue to run on
+ {
+ try {
+ String qname = extractor.getQueueName(opts
+ .getValue(ConfigOption.QUEUE_NAME.getOpt()));
+ if (qname != null) {
+ base.set(ConfigOption.QUEUE_NAME.getCfgOption(), qname);
+ }
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging queue name", e);
+ }
+ }
+ // verify and set the directory limit and ensure > 0
+ {
+ Integer directoryLimit = null;
+ try {
+ directoryLimit = extractor.getDirSize(opts
+ .getValue(ConfigOption.DIR_SIZE.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging directory file limit", e);
+ }
+ if (directoryLimit != null) {
+ if (directoryLimit <= 0) {
+ throw new ConfigException(
+ "Directory file limit can not be less than or equal to zero");
+ }
+ base.set(ConfigOption.DIR_SIZE.getCfgOption(), directoryLimit
+ .toString());
+ }
+ }
+ // set the base directory
+ {
+ Path basedir = null;
+ try {
+ basedir = extractor.getBaseDirectory(opts
+ .getValue(ConfigOption.BASE_DIR.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging base directory",
+ e);
+ }
+ if (basedir != null) {
+ // always ensure in slive dir
+ basedir = new Path(basedir, Constants.BASE_DIR);
+ base.set(ConfigOption.BASE_DIR.getCfgOption(), basedir.toString());
+ }
+ }
+ // set the result file
+ {
+ String fn = null;
+ try {
+ fn = extractor.getResultFile(opts.getValue(ConfigOption.RESULT_FILE
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging result file", e);
+ }
+ if (fn != null) {
+ base.set(ConfigOption.RESULT_FILE.getCfgOption(), fn);
+ }
+ }
+ {
+ String fn = null;
+ try {
+ fn = extractor.getResultFile(opts.getValue(ConfigOption.RESULT_FILE
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging result file", e);
+ }
+ if (fn != null) {
+ base.set(ConfigOption.RESULT_FILE.getCfgOption(), fn);
+ }
+ }
+ // set the operations
+ {
+ try {
+ base = handleOperations(opts, base, extractor);
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging operations", e);
+ }
+ }
+ // set the replication amount range
+ {
+ Range<Short> replicationAm = null;
+ try {
+ replicationAm = extractor.getReplication(opts
+ .getValue(ConfigOption.REPLICATION_AM.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging replication amount range", e);
+ }
+ if (replicationAm != null) {
+ int minRepl = base.getInt(Constants.MIN_REPLICATION, 1);
+ if (replicationAm.getLower() < minRepl) {
+ throw new ConfigException(
+ "Replication amount minimum is less than property configured minimum "
+ + minRepl);
+ }
+ if (replicationAm.getLower() > replicationAm.getUpper()) {
+ throw new ConfigException(
+ "Replication amount minimum is greater than its maximum");
+ }
+ if (replicationAm.getLower() <= 0) {
+ throw new ConfigException(
+ "Replication amount minimum must be greater than zero");
+ }
+ base.set(ConfigOption.REPLICATION_AM.getCfgOption(), replicationAm
+ .toString());
+ }
+ }
+ // set the sleep range
+ {
+ Range<Long> sleepRange = null;
+ try {
+ sleepRange = extractor.getSleepRange(opts
+ .getValue(ConfigOption.SLEEP_TIME.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging sleep size range", e);
+ }
+ if (sleepRange != null) {
+ if (sleepRange.getLower() > sleepRange.getUpper()) {
+ throw new ConfigException(
+ "Sleep range minimum is greater than its maximum");
+ }
+ if (sleepRange.getLower() <= 0) {
+ throw new ConfigException(
+ "Sleep range minimum must be greater than zero");
+ }
+ base.set(ConfigOption.SLEEP_TIME.getCfgOption(), sleepRange.toString());
+ }
+ }
+ // set the packet size if given
+ {
+ String pSize = opts.getValue(ConfigOption.PACKET_SIZE.getOpt());
+ if (pSize == null) {
+ pSize = ConfigOption.PACKET_SIZE.getDefault();
+ }
+ if (pSize != null) {
+ try {
+ Long packetSize = StringUtils.TraditionalBinaryPrefix
+ .string2long(pSize);
+ base.set(ConfigOption.PACKET_SIZE.getCfgOption(), packetSize
+ .toString());
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging write packet size", e);
+ }
+ }
+ }
+ // set the block size range
+ {
+ Range<Long> blockSize = null;
+ try {
+ blockSize = extractor.getBlockSize(opts
+ .getValue(ConfigOption.BLOCK_SIZE.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging block size range", e);
+ }
+ if (blockSize != null) {
+ if (blockSize.getLower() > blockSize.getUpper()) {
+ throw new ConfigException(
+ "Block size minimum is greater than its maximum");
+ }
+ if (blockSize.getLower() <= 0) {
+ throw new ConfigException(
+ "Block size minimum must be greater than zero");
+ }
+ // ensure block size is a multiple of BYTES_PER_CHECKSUM
+ // if a value is set in the configuration
+ Long bytesPerChecksum = extractor.getByteCheckSum();
+ if (bytesPerChecksum != null) {
+ if ((blockSize.getLower() % bytesPerChecksum) != 0) {
+ throw new ConfigException(
+ "Blocksize lower bound must be a multiple of "
+ + bytesPerChecksum);
+ }
+ if ((blockSize.getUpper() % bytesPerChecksum) != 0) {
+ throw new ConfigException(
+ "Blocksize upper bound must be a multiple of "
+ + bytesPerChecksum);
+ }
+ }
+ base.set(ConfigOption.BLOCK_SIZE.getCfgOption(), blockSize.toString());
+ }
+ }
+ // set the read size range
+ {
+ Range<Long> readSize = null;
+ try {
+ readSize = extractor.getReadSize(opts.getValue(ConfigOption.READ_SIZE
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException("Error extracting & merging read size range",
+ e);
+ }
+ if (readSize != null) {
+ if (readSize.getLower() > readSize.getUpper()) {
+ throw new ConfigException(
+ "Read size minimum is greater than its maximum");
+ }
+ if (readSize.getLower() < 0) {
+ throw new ConfigException(
+ "Read size minimum must be greater than or equal to zero");
+ }
+ base.set(ConfigOption.READ_SIZE.getCfgOption(), readSize.toString());
+ }
+ }
+ // set the write size range
+ {
+ Range<Long> writeSize = null;
+ try {
+ writeSize = extractor.getWriteSize(opts
+ .getValue(ConfigOption.WRITE_SIZE.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging write size range", e);
+ }
+ if (writeSize != null) {
+ if (writeSize.getLower() > writeSize.getUpper()) {
+ throw new ConfigException(
+ "Write size minimum is greater than its maximum");
+ }
+ if (writeSize.getLower() < 0) {
+ throw new ConfigException(
+ "Write size minimum must be greater than or equal to zero");
+ }
+ base.set(ConfigOption.WRITE_SIZE.getCfgOption(), writeSize.toString());
+ }
+ }
+ // set the append size range
+ {
+ Range<Long> appendSize = null;
+ try {
+ appendSize = extractor.getAppendSize(opts
+ .getValue(ConfigOption.APPEND_SIZE.getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging append size range", e);
+ }
+ if (appendSize != null) {
+ if (appendSize.getLower() > appendSize.getUpper()) {
+ throw new ConfigException(
+ "Append size minimum is greater than its maximum");
+ }
+ if (appendSize.getLower() < 0) {
+ throw new ConfigException(
+ "Append size minimum must be greater than or equal to zero");
+ }
+ base
+ .set(ConfigOption.APPEND_SIZE.getCfgOption(), appendSize.toString());
+ }
+ }
+ // set the seed
+ {
+ Long seed = null;
+ try {
+ seed = extractor.getRandomSeed(opts.getValue(ConfigOption.RANDOM_SEED
+ .getOpt()));
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Error extracting & merging random number seed", e);
+ }
+ if (seed != null) {
+ base.set(ConfigOption.RANDOM_SEED.getCfgOption(), seed.toString());
+ }
+ }
+ return base;
+ }
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigMerger.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java Fri May 21 00:02:12 2010
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import org.apache.commons.cli.Option;
+
+/**
+ * Class which extends the basic option object and adds in the configuration id
+ * and a default value so a central place can be used for retrieval of these as
+ * needed
+ */
+class ConfigOption<T> extends Option {
+
+ private static final long serialVersionUID = 7218954906367671150L;
+
+ // config starts with this prefix
+ private static final String SLIVE_PREFIX = "slive";
+
+ // command line options and descriptions and config option name
+ static final ConfigOption<Integer> MAPS = new ConfigOption<Integer>(
+ "maps", true, "Number of maps", SLIVE_PREFIX + " .maps", 10);
+
+ static final ConfigOption<Integer> OPS = new ConfigOption<Integer>(
+ "ops", true, "Max number of operations per map", SLIVE_PREFIX
+ + ".map.ops", 1000);
+
+ static final ConfigOption<Integer> DURATION = new ConfigOption<Integer>(
+ "duration", true,
+ "Duration of a map task in seconds (MAX_INT for no limit)", SLIVE_PREFIX
+ + ".duration", Integer.MAX_VALUE);
+
+ static final ConfigOption<Boolean> EXIT_ON_ERROR = new ConfigOption<Boolean>(
+ "exitOnError", false, "Exit on first error", SLIVE_PREFIX
+ + ".exit.on.error", false);
+
+ static final ConfigOption<Integer> FILES = new ConfigOption<Integer>(
+ "files", true, "Max total number of files",
+ SLIVE_PREFIX + ".total.files", 10);
+
+ static final ConfigOption<Integer> DIR_SIZE = new ConfigOption<Integer>(
+ "dirSize", true, "Max files per directory", SLIVE_PREFIX + ".dir.size",
+ 32);
+
+ static final ConfigOption<String> BASE_DIR = new ConfigOption<String>(
+ "baseDir", true, "Base directory path", SLIVE_PREFIX + ".base.dir",
+ "/test/slive");
+
+ static final ConfigOption<String> RESULT_FILE = new ConfigOption<String>(
+ "resFile", true, "Result file name", SLIVE_PREFIX + ".result.file",
+ "part-0000");
+
+ static final ConfigOption<Short> REPLICATION_AM = new ConfigOption<Short>(
+ "replication", true, "Min,max value for replication amount", SLIVE_PREFIX
+ + ".file.replication", (short) 3);
+
+ static final ConfigOption<Long> BLOCK_SIZE = new ConfigOption<Long>(
+ "blockSize", true, "Min,max for dfs file block size", SLIVE_PREFIX
+ + ".block.size", 64L * Constants.MEGABYTES);
+
+ static final ConfigOption<Long> READ_SIZE = new ConfigOption<Long>(
+ "readSize", true,
+ "Min,max for size to read (min=max=MAX_LONG=read entire file)",
+ SLIVE_PREFIX + ".op.read.size", null);
+
+ static final ConfigOption<Long> WRITE_SIZE = new ConfigOption<Long>(
+ "writeSize", true,
+ "Min,max for size to write (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX
+ + ".op.write.size", null);
+
+ static final ConfigOption<Long> SLEEP_TIME = new ConfigOption<Long>(
+ "sleep",
+ true,
+ "Min,max for millisecond of random sleep to perform (between operations)",
+ SLIVE_PREFIX + ".op.sleep.range", null);
+
+ static final ConfigOption<Long> APPEND_SIZE = new ConfigOption<Long>(
+ "appendSize", true,
+ "Min,max for size to append (min=max=MAX_LONG=blocksize)", SLIVE_PREFIX
+ + ".op.append.size", null);
+
+ static final ConfigOption<Long> RANDOM_SEED = new ConfigOption<Long>(
+ "seed", true, "Random number seed", SLIVE_PREFIX + ".seed", null);
+
+ // command line only options
+ static final Option HELP = new Option("help", false,
+ "Usage information");
+
+ static final Option CLEANUP = new Option("cleanup", true,
+ "Cleanup & remove directory after reporting");
+
+ // non slive specific settings
+ static final ConfigOption<String> QUEUE_NAME = new ConfigOption<String>(
+ "queue", true, "Queue name", "mapred.job.queue.name", "default");
+
+ static final ConfigOption<String> PACKET_SIZE = new ConfigOption<String>(
+ "packetSize", true, "Dfs write packet size", "dfs.write.packet.size",
+ null);
+
+ /**
+ * Hadoop configuration property name
+ */
+ private String cfgOption;
+
+ /**
+ * Default value if no value is located by other means
+ */
+ private T defaultValue;
+
+ ConfigOption(String cliOption, boolean hasArg, String description,
+ String cfgOption, T def) {
+ super(cliOption, hasArg, description);
+ this.cfgOption = cfgOption;
+ this.defaultValue = def;
+ }
+
+ /**
+ * @return the configuration option name to lookup in Configuration objects
+ * for this option
+ */
+ String getCfgOption() {
+ return cfgOption;
+ }
+
+ /**
+ * @return the default object for this option
+ */
+ T getDefault() {
+ return defaultValue;
+ }
+
+}
\ No newline at end of file
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/ConfigOption.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java Fri May 21 00:02:12 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+/**
+ * Constants used in various places in slive
+ */
+class Constants {
+
+ /**
+ * This class should be static members only - no construction allowed
+ */
+ private Constants() {
+ }
+
+ /**
+ * The distributions supported (or that maybe supported)
+ */
+ enum Distribution {
+ BEG, END, UNIFORM, MID;
+ String lowerName() {
+ return this.name().toLowerCase();
+ }
+ }
+
+ /**
+ * Allowed operation types
+ */
+ enum OperationType {
+ READ, APPEND, RENAME, LS, MKDIR, DELETE, CREATE;
+ String lowerName() {
+ return this.name().toLowerCase();
+ }
+ }
+
+ // program info
+ static final String PROG_NAME = SliveTest.class.getSimpleName();
+ static final String PROG_VERSION = "0.0.1";
+
+ // useful constants
+ static final int MEGABYTES = 1048576;
+
+ // must be a multiple of
+ // BYTES_PER_LONG - used for reading and writing buffer sizes
+ static final int BUFFERSIZE = 64 * 1024;
+
+ // 8 bytes per long
+ static final int BYTES_PER_LONG = 8;
+
+ // used for finding the reducer file for a given number
+ static final String REDUCER_FILE = "part-%s";
+
+ // this is used to ensure the blocksize is a multiple of this config setting
+ static final String BYTES_PER_CHECKSUM = "io.bytes.per.checksum";
+
+ // min replication setting for verification
+ static final String MIN_REPLICATION = "dfs.namenode.replication.min";
+
+ // used for getting an option description given a set of distributions
+ // to substitute
+ static final String OP_DESCR = "pct,distribution where distribution is one of %s";
+
+ // keys for looking up a specific operation in the hadoop config
+ static final String OP_PERCENT = "slive.op.%s.pct";
+ static final String OP = "slive.op.%s";
+ static final String OP_DISTR = "slive.op.%s.dist";
+
+ // path constants
+ static final String BASE_DIR = "slive";
+ static final String DATA_DIR = "data";
+ static final String OUTPUT_DIR = "output";
+
+ // whether whenever data is written a flush should occur
+ static final boolean FLUSH_WRITES = false;
+
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/Constants.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java Fri May 21 00:02:12 2010
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.slive.DataWriter.GenerateOutput;
+import org.apache.hadoop.fs.slive.OperationOutput.OutputType;
+
+/**
+ * Operation which selects a random file and a random number of bytes to create
+ * that file with (from the write size option) and selects a random block size
+ * (from the block size option) and a random replication amount (from the
+ * replication option) and attempts to create a file with those options.
+ *
+ * This operation will capture statistics on success for bytes written, time
+ * taken (milliseconds), and success count and on failure it will capture the
+ * number of failures and the time taken (milliseconds) to fail.
+ */
+class CreateOp extends Operation {
+
+ private static final Log LOG = LogFactory.getLog(CreateOp.class);
+
+ private static int DEF_IO_BUFFER_SIZE = 4096;
+
+ private static final String IO_BUF_CONFIG = ("io.file.buffer.size");
+
+ CreateOp(ConfigExtractor cfg, Random rnd) {
+ super(CreateOp.class.getSimpleName(), cfg, rnd);
+ }
+
+ /**
+ * Returns the block size to use (aligned to nearest BYTES_PER_CHECKSUM if
+ * configuration says a value exists) - this will avoid the warnings caused by
+ * this not occurring and the file will not be created if it is not correct...
+ *
+ * @return long
+ */
+ private long determineBlockSize() {
+ Range<Long> blockSizeRange = getConfig().getBlockSize();
+ long blockSize = Range.betweenPositive(getRandom(), blockSizeRange);
+ Long byteChecksum = getConfig().getByteCheckSum();
+ if (byteChecksum == null) {
+ return blockSize;
+ }
+ // adjust to nearest multiple
+ long full = (blockSize / byteChecksum) * byteChecksum;
+ long toFull = blockSize - full;
+ if (toFull >= (byteChecksum / 2)) {
+ full += byteChecksum;
+ }
+ // adjust if over extended
+ if (full > blockSizeRange.getUpper()) {
+ full = blockSizeRange.getUpper();
+ }
+ if (full < blockSizeRange.getLower()) {
+ full = blockSizeRange.getLower();
+ }
+ return full;
+ }
+
+ /**
+ * Gets the replication amount
+ *
+ * @return short
+ */
+ private short determineReplication() {
+ Range<Short> replicationAmountRange = getConfig().getReplication();
+ Range<Long> repRange = new Range<Long>(replicationAmountRange.getLower()
+ .longValue(), replicationAmountRange.getUpper().longValue());
+ short replicationAmount = (short) Range.betweenPositive(getRandom(),
+ repRange);
+ return replicationAmount;
+ }
+
+ /**
+ * Gets the output buffering size to use
+ *
+ * @return int
+ */
+ private int getBufferSize() {
+ return getConfig().getConfig().getInt(IO_BUF_CONFIG, DEF_IO_BUFFER_SIZE);
+ }
+
+ /**
+ * Gets the file to create
+ *
+ * @return Path
+ */
+ protected Path getCreateFile() {
+ Path fn = getFinder().getFile();
+ return fn;
+ }
+
+ @Override // Operation
+ List<OperationOutput> run(FileSystem fs) {
+ List<OperationOutput> out = super.run(fs);
+ FSDataOutputStream os = null;
+ try {
+ Path fn = getCreateFile();
+ Range<Long> writeSizeRange = getConfig().getWriteSize();
+ long writeSize = 0;
+ long blockSize = determineBlockSize();
+ short replicationAmount = determineReplication();
+ if (getConfig().shouldWriteUseBlockSize()) {
+ writeSizeRange = getConfig().getBlockSize();
+ }
+ writeSize = Range.betweenPositive(getRandom(), writeSizeRange);
+ long bytesWritten = 0;
+ long timeTaken = 0;
+ int bufSize = getBufferSize();
+ boolean overWrite = false;
+ DataWriter writer = new DataWriter(getRandom());
+ LOG.info("Attempting to create file at " + fn + " of size "
+ + Helper.toByteInfo(writeSize) + " using blocksize "
+ + Helper.toByteInfo(blockSize) + " and replication amount "
+ + replicationAmount);
+ {
+ // open & create
+ long startTime = Timer.now();
+ os = fs.create(fn, overWrite, bufSize, replicationAmount, blockSize);
+ timeTaken += Timer.elapsed(startTime);
+ // write the given length
+ GenerateOutput stats = writer.writeSegment(writeSize, os);
+ bytesWritten += stats.getBytesWritten();
+ timeTaken += stats.getTimeTaken();
+ // capture close time
+ startTime = Timer.now();
+ os.close();
+ os = null;
+ timeTaken += Timer.elapsed(startTime);
+ }
+ LOG.info("Created file at " + fn + " of size "
+ + Helper.toByteInfo(bytesWritten) + " bytes using blocksize "
+ + Helper.toByteInfo(blockSize) + " and replication amount "
+ + replicationAmount + " in " + timeTaken + " milliseconds");
+ // collect all the stats
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.OK_TIME_TAKEN, timeTaken));
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.BYTES_WRITTEN, bytesWritten));
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.SUCCESSES, 1L));
+ } catch (IOException e) {
+ out.add(new OperationOutput(OutputType.LONG, getType(),
+ ReportWriter.FAILURES, 1L));
+ LOG.warn("Error with creating", e);
+ } finally {
+ if (os != null) {
+ try {
+ os.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing create stream", e);
+ }
+ }
+ }
+ return out;
+ }
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/CreateOp.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java?rev=946832&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java (added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java Fri May 21 00:02:12 2010
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.slive;
+
+import java.util.Random;
+
+/**
+ * Class which is used to create the data to write for a given path and offset
+ * into that file for writing and later verification that the expected value is
+ * read at that file bytes offset
+ */
+class DataHasher {
+
+ private Random rnd;
+
+ DataHasher(long mixIn) {
+ this.rnd = new Random(mixIn);
+ }
+
+ /**
+ * @param offSet
+ * the byte offset into the file
+ *
+ * @return the data to be expected at that offset
+ */
+ long generate(long offSet) {
+ return ((offSet * 47) ^ (rnd.nextLong() * 97)) * 37;
+ }
+
+}
Propchange: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/fs/slive/DataHasher.java
------------------------------------------------------------------------------
svn:mime-type = text/plain