You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2010/11/23 00:44:34 UTC
svn commit: r1037944 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
Author: schen
Date: Mon Nov 22 23:44:34 2010
New Revision: 1037944
URL: http://svn.apache.org/viewvc?rev=1037944&view=rev
Log:
MAPREDUCE-2184. Port DistRaid.java to new mapreduce API. (Ramkumar Vadali via
schen)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1037944&r1=1037943&r2=1037944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Mon Nov 22 23:44:34 2010
@@ -190,6 +190,9 @@ Release 0.22.0 - Unreleased
MAPREDUCE-1931. Gridmix forrest documentation . (Ranjit Mathew via vinodkv).
+ MAPREDUCE-2184. Port DistRaid.java to new mapreduce API. (Ramkumar Vadali
+ via schen)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=1037944&r1=1037943&r2=1037944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Mon Nov 22 23:44:34 2010
@@ -28,68 +28,48 @@ import java.text.SimpleDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
-import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Reader;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.raid.RaidNode.Statistics;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.util.StringUtils;
-public class DistRaid {
+public class DistRaid extends Configured {
protected static final Log LOG = LogFactory.getLog(DistRaid.class);
static final String NAME = "distRaid";
static final String JOB_DIR_LABEL = NAME + ".job.dir";
- static final String OP_LIST_LABEL = NAME + ".op.list";
- static final String OP_COUNT_LABEL = NAME + ".op.count";
static final int OP_LIST_BLOCK_SIZE = 32 * 1024 * 1024; // block size of control file
static final short OP_LIST_REPLICATION = 10; // replication factor of control file
- private static final long OP_PER_MAP = 100;
- private static final int MAX_MAPS_PER_NODE = 20;
+ public static final String OPS_PER_TASK = "raid.distraid.opspertask";
+ private static final int DEFAULT_OPS_PER_TASK = 100;
private static final int SYNC_FILE_MAX = 10;
private static final SimpleDateFormat dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm");
- private static String jobName = NAME;
static enum Counter {
FILES_SUCCEEDED, FILES_FAILED, PROCESSED_BLOCKS, PROCESSED_SIZE, META_BLOCKS, META_SIZE
}
- protected JobConf jobconf;
-
- /** {@inheritDoc} */
- public void setConf(Configuration conf) {
- if (jobconf != conf) {
- jobconf = conf instanceof JobConf ? (JobConf) conf : new JobConf(conf);
- }
- }
-
- /** {@inheritDoc} */
- public JobConf getConf() {
- return jobconf;
- }
-
public DistRaid(Configuration conf) {
- setConf(createJobConf(conf));
+ super(conf);
}
private static final Random RANDOM = new Random();
@@ -99,9 +79,9 @@ public class DistRaid {
}
/**
- *
+ *
* helper class which holds the policy and paths
- *
+ *
*/
public static class RaidPolicyPathPair {
public PolicyInfo policy;
@@ -115,127 +95,102 @@ public class DistRaid {
List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
- private JobClient jobClient;
- private RunningJob runningJob;
- private int jobEventCounter = 0;
+ private Job runningJob;
private String lastReport = null;
/** Responsible for generating splits of the src file list. */
- static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
- /** Do nothing. */
- public void validateInput(JobConf job) {
- }
-
+ static class DistRaidInputFormat extends
+ SequenceFileInputFormat<Text, PolicyInfo> {
/**
* Produce splits such that each is no greater than the quotient of the
* total size and the number of splits requested.
- *
+ *
* @param job
- * The handle to the JobConf object
+ * The handle to the Configuration object
* @param numSplits
* Number of splits requested
*/
- public InputSplit[] getSplits(JobConf job, int numSplits)
- throws IOException {
- final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
- final int targetcount = srcCount / numSplits;
- String srclist = job.get(OP_LIST_LABEL, "");
- if (srcCount < 0 || "".equals(srclist)) {
- throw new RuntimeException("Invalid metadata: #files(" + srcCount
- + ") listuri(" + srclist + ")");
- }
- Path srcs = new Path(srclist);
- FileSystem fs = srcs.getFileSystem(job);
-
- List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+ public List<InputSplit> getSplits(JobContext job) throws IOException {
+ Configuration conf = job.getConfiguration();
- Text key = new Text();
- PolicyInfo value = new PolicyInfo();
- SequenceFile.Reader in = null;
+ // We create only one input file. So just get the first file in the first
+ // input directory.
+ Path inDir = getInputPaths(job)[0];
+ FileSystem fs = inDir.getFileSystem(conf);
+ FileStatus[] inputFiles = fs.listStatus(inDir);
+ Path inputFile = inputFiles[0].getPath();
+
+ List<InputSplit> splits = new ArrayList<InputSplit>();
+ SequenceFile.Reader in =
+ new SequenceFile.Reader(conf, Reader.file(inputFile));
long prev = 0L;
- int count = 0; // count src
+ final int opsPerTask = conf.getInt(OPS_PER_TASK, DEFAULT_OPS_PER_TASK);
try {
- for (in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value);) {
+ Text key = new Text();
+ PolicyInfo value = new PolicyInfo();
+ int count = 0; // count src
+ while (in.next(key, value)) {
long curr = in.getPosition();
long delta = curr - prev;
- if (++count > targetcount) {
+ if (++count > opsPerTask) {
count = 0;
- splits.add(new FileSplit(srcs, prev, delta, (String[]) null));
+ splits.add(new FileSplit(inputFile, prev, delta, (String[]) null));
prev = curr;
}
}
} finally {
in.close();
}
- long remaining = fs.getFileStatus(srcs).getLen() - prev;
+ long remaining = fs.getFileStatus(inputFile).getLen() - prev;
if (remaining != 0) {
- splits.add(new FileSplit(srcs, prev, remaining, (String[]) null));
+ splits.add(new FileSplit(inputFile, prev, remaining, (String[]) null));
}
- LOG.info("jobname= " + jobName + " numSplits=" + numSplits +
- ", splits.size()=" + splits.size());
- return splits.toArray(new FileSplit[splits.size()]);
- }
-
- /** {@inheritDoc} */
- public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter) throws IOException {
- return new SequenceFileRecordReader<Text, PolicyInfo>(job,
- (FileSplit) split);
+ return splits;
}
}
/** The mapper for raiding files. */
- static class DistRaidMapper implements
- Mapper<Text, PolicyInfo, WritableComparable, Text> {
- private JobConf jobconf;
- private boolean ignoreFailures;
+ static class DistRaidMapper extends Mapper<Text, PolicyInfo, Text, Text> {
+ private boolean ignoreFailures = false;
private int failcount = 0;
private int succeedcount = 0;
- private Statistics st = null;
- private Reporter reporter = null;
+ private Statistics st = new Statistics();
private String getCountString() {
return "Succeeded: " + succeedcount + " Failed: " + failcount;
}
- /** {@inheritDoc} */
- public void configure(JobConf job) {
- this.jobconf = job;
- ignoreFailures = false;
- st = new Statistics();
- }
-
- /** Run a FileOperation */
- public void map(Text key, PolicyInfo policy,
- OutputCollector<WritableComparable, Text> out, Reporter reporter)
- throws IOException {
- this.reporter = reporter;
+ /** Run a FileOperation
+ * @throws IOException
+ * @throws InterruptedException */
+ public void map(Text key, PolicyInfo policy, Context context)
+ throws IOException, InterruptedException {
try {
+ Configuration jobConf = context.getConfiguration();
LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
Path p = new Path(key.toString());
- FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
+ FileStatus fs = p.getFileSystem(jobConf).getFileStatus(p);
st.clear();
- RaidNode.doRaid(jobconf, policy, fs, st, reporter);
+ RaidNode.doRaid(jobConf, policy, fs, st, context);
++succeedcount;
- reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
- reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
- reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
- reporter.incrCounter(Counter.META_SIZE, st.metaSize);
-
- reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
+ context.getCounter(Counter.PROCESSED_BLOCKS).increment(st.numProcessedBlocks);
+ context.getCounter(Counter.PROCESSED_SIZE).increment(st.processedSize);
+ context.getCounter(Counter.META_BLOCKS).increment(st.numMetaBlocks);
+ context.getCounter(Counter.META_SIZE).increment(st.metaSize);
+ context.getCounter(Counter.FILES_SUCCEEDED).increment(1);
} catch (IOException e) {
++failcount;
- reporter.incrCounter(Counter.FILES_FAILED, 1);
+ context.getCounter(Counter.FILES_FAILED).increment(1);
String s = "FAIL: " + policy + ", " + key + " "
+ StringUtils.stringifyException(e);
- out.collect(null, new Text(s));
+ context.write(new Text(key), new Text(s));
LOG.info(s);
} finally {
- reporter.setStatus(getCountString());
+ context.setStatus(getCountString());
}
}
@@ -267,26 +222,27 @@ public class DistRaid {
}
/**
- * create new job conf based on configuration passed.
- *
+ * Creates a new Job object.
* @param conf
- * @return
+ * @return a Job object
+ * @throws IOException
*/
- static JobConf createJobConf(Configuration conf) {
- JobConf jobconf = new JobConf(conf, DistRaid.class);
- jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
- jobconf.setJobName(jobName);
- jobconf.setMapSpeculativeExecution(false);
- setSchedulerOption(jobconf);
-
- jobconf.setJarByClass(DistRaid.class);
- jobconf.setInputFormat(DistRaidInputFormat.class);
- jobconf.setOutputKeyClass(Text.class);
- jobconf.setOutputValueClass(Text.class);
-
- jobconf.setMapperClass(DistRaidMapper.class);
- jobconf.setNumReduceTasks(0);
- return jobconf;
+ static Job createJob(Configuration jobConf) throws IOException {
+ String jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
+
+ setSchedulerOption(jobConf);
+
+ Job job = Job.getInstance(jobConf, jobName);
+ job.setSpeculativeExecution(false);
+ job.setJarByClass(DistRaid.class);
+ job.setInputFormatClass(DistRaidInputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setMapperClass(DistRaidMapper.class);
+ job.setNumReduceTasks(0);
+
+ return job;
}
/** Add paths to be raided */
@@ -294,25 +250,24 @@ public class DistRaid {
raidPolicyPathPairList.add(new RaidPolicyPathPair(info, paths));
}
- /** Calculate how many maps to run. */
- private static int getMapCount(int srcCount, int numNodes) {
- int numMaps = (int) (srcCount / OP_PER_MAP);
- numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
- return Math.max(numMaps, MAX_MAPS_PER_NODE);
- }
-
/** Invokes a map-reduce job do parallel raiding.
* @return true if the job was started, false otherwise
+ * @throws InterruptedException
*/
public boolean startDistRaid() throws IOException {
assert(raidPolicyPathPairList.size() > 0);
- if (setup()) {
- this.jobClient = new JobClient(jobconf);
- this.runningJob = this.jobClient.submitJob(jobconf);
- LOG.info("Job Started: " + runningJob.getID());
+ Job job = createJob(getConf());
+ createInputFile(job);
+ try {
+ job.submit();
+ this.runningJob = job;
+ LOG.info("Job Started: " + runningJob.getJobID());
return true;
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ } catch (InterruptedException e) {
+ return false;
}
- return false;
}
/** Checks if the map-reduce job has completed.
@@ -321,76 +276,74 @@ public class DistRaid {
* @throws IOException
*/
public boolean checkComplete() throws IOException {
- JobID jobID = runningJob.getID();
- if (runningJob.isComplete()) {
- // delete job directory
- final String jobdir = jobconf.get(JOB_DIR_LABEL);
- if (jobdir != null) {
- final Path jobpath = new Path(jobdir);
- jobpath.getFileSystem(jobconf).delete(jobpath, true);
- }
- if (runningJob.isSuccessful()) {
- LOG.info("Job Complete(Succeeded): " + jobID);
+ JobID jobID = runningJob.getJobID();
+ try {
+ if (runningJob.isComplete()) {
+ // delete job directory
+ Configuration jobConf = runningJob.getConfiguration();
+ final String jobdir = jobConf.get(JOB_DIR_LABEL);
+ if (jobdir != null) {
+ final Path jobpath = new Path(jobdir);
+ jobpath.getFileSystem(jobConf).delete(jobpath, true);
+ }
+ if (runningJob.isSuccessful()) {
+ LOG.info("Job Complete(Succeeded): " + jobID);
+ } else {
+ LOG.info("Job Complete(Failed): " + jobID);
+ }
+ raidPolicyPathPairList.clear();
+ return true;
} else {
- LOG.info("Job Complete(Failed): " + jobID);
- }
- raidPolicyPathPairList.clear();
- return true;
- } else {
- String report = (" job " + jobID +
- " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
- " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
- if (!report.equals(lastReport)) {
- LOG.info(report);
- lastReport = report;
- }
- TaskCompletionEvent[] events =
- runningJob.getTaskCompletionEvents(jobEventCounter);
- jobEventCounter += events.length;
- for(TaskCompletionEvent event : events) {
- if (event.getTaskStatus() == TaskCompletionEvent.Status.FAILED) {
- LOG.info(" Job " + jobID + " " + event.toString());
+ String report = (" job " + jobID +
+ " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
+ " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
+ if (!report.equals(lastReport)) {
+ LOG.info(report);
+ lastReport = report;
}
+ return false;
}
- return false;
- }
+ } catch (InterruptedException e) {
+ return false;
+ }
}
public boolean successful() throws IOException {
- return runningJob.isSuccessful();
+ try {
+ return runningJob.isSuccessful();
+ } catch (InterruptedException e) {
+ return false;
+ }
}
/**
* set up input file which has the list of input files.
- *
+ *
* @return boolean
* @throws IOException
*/
- private boolean setup() throws IOException {
- final String randomId = getRandomId();
- JobClient jClient = new JobClient(jobconf);
- Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
-
- LOG.info(JOB_DIR_LABEL + "=" + jobdir);
- jobconf.set(JOB_DIR_LABEL, jobdir.toString());
- Path log = new Path(jobdir, "_logs");
+ private void createInputFile(Job job) throws IOException {
+ Configuration jobConf = job.getConfiguration();
+ Path jobDir = new Path(JOB_DIR_LABEL + getRandomId());
+ Path inDir = new Path(jobDir, "in");
+ Path outDir = new Path(jobDir, "out");
+ FileInputFormat.setInputPaths(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+ Path opList = new Path(inDir, NAME);
+ Configuration tmp = new Configuration(jobConf);
// The control file should have small size blocks. This helps
// in spreading out the load from mappers that will be spawned.
- jobconf.setInt("dfs.blocks.size", OP_LIST_BLOCK_SIZE);
+ tmp.setInt("dfs.blocks.size", OP_LIST_BLOCK_SIZE);
+ FileSystem fs = opList.getFileSystem(tmp);
- FileOutputFormat.setOutputPath(jobconf, log);
- LOG.info("log=" + log);
-
- // create operation list
- FileSystem fs = jobdir.getFileSystem(jobconf);
- Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
- jobconf.set(OP_LIST_LABEL, opList.toString());
int opCount = 0, synCount = 0;
SequenceFile.Writer opWriter = null;
try {
- opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
- PolicyInfo.class, SequenceFile.CompressionType.NONE);
+ opWriter = SequenceFile.createWriter(
+ jobConf, Writer.file(opList), Writer.keyClass(Text.class),
+ Writer.valueClass(PolicyInfo.class),
+ Writer.compression(SequenceFile.CompressionType.NONE));
for (RaidPolicyPathPair p : raidPolicyPathPairList) {
// If a large set of files are Raided for the first time, files
// in the same directory that tend to have the same size will end up
@@ -411,16 +364,10 @@ public class DistRaid {
if (opWriter != null) {
opWriter.close();
}
- fs.setReplication(opList, OP_LIST_REPLICATION); // increase replication for control file
+ // increase replication for control file
+ fs.setReplication(opList, OP_LIST_REPLICATION);
}
raidPolicyPathPairList.clear();
-
- jobconf.setInt(OP_COUNT_LABEL, opCount);
LOG.info("Number of files=" + opCount);
- jobconf.setNumMapTasks(getMapCount(opCount, new JobClient(jobconf)
- .getClusterStatus().getTaskTrackers()));
- LOG.info("jobName= " + jobName + " numMapTasks=" + jobconf.getNumMapTasks());
- return opCount != 0;
-
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java?rev=1037944&r1=1037943&r2=1037944&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java Mon Nov 22 23:44:34 2010
@@ -21,14 +21,7 @@ import java.io.File;
import java.io.FileWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.GregorianCalendar;
-import java.util.Iterator;
import java.util.List;
-import java.util.Properties;
import java.util.Random;
import java.util.zip.CRC32;
@@ -45,7 +38,6 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
@@ -101,13 +93,15 @@ public class TestRaidNode extends TestCa
conf.set("raid.server.address", "localhost:0");
// create a dfs and map-reduce cluster
- final int taskTrackers = 4;
- final int jobTrackerPort = 60050;
-
- dfs = new MiniDFSCluster(conf, 6, true, null);
+ MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf);
+ builder.numDataNodes(6);
+ builder.format(true);
+ dfs = builder.build();
dfs.waitActive();
fileSys = dfs.getFileSystem();
+
namenode = fileSys.getUri().toString();
+ final int taskTrackers = 4;
mr = new MiniMRCluster(taskTrackers, namenode, 3);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
@@ -291,7 +285,6 @@ public class TestRaidNode extends TestCa
// create an instance of the RaidNode
cnode = RaidNode.createRaidNode(null, conf);
-
FileStatus[] listPaths = null;
// wait till file is raided
@@ -368,14 +361,15 @@ public class TestRaidNode extends TestCa
// greater than or equal to the specified value
private void doCheckPolicy() throws Exception {
LOG.info("doCheckPolicy started---------------------------:");
- short srcReplication = 3;
+ short srcReplication = 1;
long targetReplication = 2;
long metaReplication = 1;
long stripeLength = 2;
long blockSize = 1024;
int numBlock = 3;
ConfigBuilder cb = new ConfigBuilder();
- cb.addPolicy("policy1", "/user/dhruba/policytest", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.addPolicy("policy1", "/user/dhruba/policytest", srcReplication,
+ targetReplication, metaReplication, stripeLength);
cb.persist();
Path dir = new Path("/user/dhruba/policytest/");
Path file1 = new Path(dir + "/file1");
@@ -392,7 +386,7 @@ public class TestRaidNode extends TestCa
cnode = RaidNode.createRaidNode(null, localConf);
// this file should be picked up RaidNode
- long crc2 = createOldFile(fileSys, file2, 2, numBlock, blockSize);
+ createOldFile(fileSys, file2, 2, numBlock, blockSize);
FileStatus[] listPaths = null;
long firstmodtime = 0;
@@ -429,7 +423,7 @@ public class TestRaidNode extends TestCa
LOG.info("doCheckPolicy all files found in Raid the first time.");
LOG.info("doCheckPolicy: recreating source file");
- crc2 = createOldFile(fileSys, file2, 2, numBlock, blockSize);
+ createOldFile(fileSys, file2, 2, numBlock, blockSize);
FileStatus st = fileSys.getFileStatus(file2);
assertTrue(st.getModificationTime() > firstmodtime);
@@ -497,8 +491,10 @@ public class TestRaidNode extends TestCa
createClusters(false);
ConfigBuilder cb = new ConfigBuilder();
- cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
- cb.addPolicy("policy2", "/user/dhruba/raidtest2", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.addPolicy("policy1", "/user/dhruba/raidtest",
+ srcReplication, targetReplication, metaReplication, stripeLength);
+ cb.addPolicy("policy2", "/user/dhruba/raidtest2",
+ srcReplication, targetReplication, metaReplication, stripeLength);
cb.persist();
RaidNode cnode = null;
@@ -515,13 +511,15 @@ public class TestRaidNode extends TestCa
for (PolicyInfo p : policyList.getAll()) {
if (p.getName().equals("policy1")) {
Path srcPath = new Path("/user/dhruba/raidtest");
+ FileSystem fs = srcPath.getFileSystem(conf);
assertTrue(p.getSrcPath().equals(
- srcPath.makeQualified(srcPath.getFileSystem(conf))));
+ srcPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())));
} else {
assertTrue(p.getName().equals("policy2"));
Path srcPath = new Path("/user/dhruba/raidtest2");
+ FileSystem fs = srcPath.getFileSystem(conf);
assertTrue(p.getSrcPath().equals(
- srcPath.makeQualified(srcPath.getFileSystem(conf))));
+ srcPath.makeQualified(fs.getUri(), fs.getWorkingDirectory())));
}
assertEquals(targetReplication,
Integer.parseInt(p.getProperty("targetReplication")));
@@ -542,14 +540,14 @@ public class TestRaidNode extends TestCa
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
- this.assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
+ assertEquals(dcnode.jobMonitor.jobsMonitored(), 2);
start = System.currentTimeMillis();
while (dcnode.jobMonitor.jobsSucceeded() < 2 &&
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
- this.assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
+ assertEquals(dcnode.jobMonitor.jobsSucceeded(), 2);
LOG.info("Test testDistRaid successful.");
@@ -643,7 +641,8 @@ public class TestRaidNode extends TestCa
createClusters(false);
ConfigBuilder cb = new ConfigBuilder();
- cb.addPolicy("policy1", "/user/dhruba/raidtest", (short)1, targetReplication, metaReplication, stripeLength);
+ cb.addPolicy("policy1", "/user/dhruba/raidtest",
+ srcReplication, targetReplication, metaReplication, stripeLength);
cb.persist();
RaidNode cnode = null;
@@ -680,8 +679,8 @@ public class TestRaidNode extends TestCa
System.currentTimeMillis() - start < MAX_WAITTIME) {
Thread.sleep(1000);
}
- this.assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
- this.assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
+ assertEquals(dcnode.jobMonitor.jobsMonitored(), numJobsExpected);
+ assertEquals(dcnode.jobMonitor.jobsSucceeded(), numJobsExpected);
LOG.info("Test testSuspendTraversal successful.");
@@ -695,11 +694,12 @@ public class TestRaidNode extends TestCa
LOG.info("Test testSuspendTraversal completed.");
}
- public void testSchedulerOption() {
+ public void testSchedulerOption() throws IOException {
Configuration conf = new Configuration();
conf.set("raid.scheduleroption",
"mapred.fairscheduler.pool:dummy,foo:bar");
- org.apache.hadoop.mapred.JobConf jobConf = DistRaid.createJobConf(conf);
+ org.apache.hadoop.mapreduce.Job job = DistRaid.createJob(conf);
+ Configuration jobConf = job.getConfiguration();
assertEquals("dummy", jobConf.get("mapred.fairscheduler.pool"));
assertEquals("bar", jobConf.get("foo"));
}