You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [3/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh Sat Nov 28 20:26:01 2009
@@ -29,51 +29,51 @@
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
- -D test.randomtextwrite.min_words_key=5 \
- -D test.randomtextwrite.max_words_key=10 \
- -D test.randomtextwrite.min_words_value=100 \
- -D test.randomtextwrite.max_words_value=10000 \
- -D mapred.output.compress=true \
+ -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+ -D mapreduce.randomtextwriter.minwordskey=5 \
+ -D mapreduce.randomtextwriter.maxwordskey=10 \
+ -D mapreduce.randomtextwriter.minwordsvalue=100 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=10000 \
+ -D mapreduce.output.fileoutputformat.compress=true \
-D mapred.map.output.compression.type=BLOCK \
- -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
+ -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
${VARCOMPSEQ} &
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
- -D test.randomtextwrite.min_words_key=5 \
- -D test.randomtextwrite.max_words_key=5 \
- -D test.randomtextwrite.min_words_value=100 \
- -D test.randomtextwrite.max_words_value=100 \
- -D mapred.output.compress=true \
+ -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+ -D mapreduce.randomtextwriter.minwordskey=5 \
+ -D mapreduce.randomtextwriter.maxwordskey=5 \
+ -D mapreduce.randomtextwriter.minwordsvalue=100 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=100 \
+ -D mapreduce.output.fileoutputformat.compress=true \
-D mapred.map.output.compression.type=BLOCK \
- -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
+ -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
${FIXCOMPSEQ} &
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${UNCOMPRESSED_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
- -D test.randomtextwrite.min_words_key=1 \
- -D test.randomtextwrite.max_words_key=10 \
- -D test.randomtextwrite.min_words_value=0 \
- -D test.randomtextwrite.max_words_value=200 \
- -D mapred.output.compress=false \
- -outFormat org.apache.hadoop.mapred.TextOutputFormat \
+ -D mapreduce.randomtextwriter.totalbytes=${UNCOMPRESSED_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+ -D mapreduce.randomtextwriter.minwordskey=1 \
+ -D mapreduce.randomtextwriter.maxwordskey=10 \
+ -D mapreduce.randomtextwriter.minwordsvalue=0 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=200 \
+ -D mapreduce.output.fileoutputformat.compress=false \
+ -outFormat org.apache.hadoop.mapreduce.lib.output.TextOutputFormat \
${VARINFLTEXT} &
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${INDIRECT_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${INDIRECT_DATA_BYTES} / ${INDIRECT_DATA_FILES})) \
- -D test.randomtextwrite.min_words_key=5 \
- -D test.randomtextwrite.max_words_key=5 \
- -D test.randomtextwrite.min_words_value=20 \
- -D test.randomtextwrite.max_words_value=20 \
- -D mapred.output.compress=true \
+ -D mapreduce.randomtextwriter.totalbytes=${INDIRECT_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${INDIRECT_DATA_BYTES} / ${INDIRECT_DATA_FILES})) \
+ -D mapreduce.randomtextwriter.minwordskey=5 \
+ -D mapreduce.randomtextwriter.maxwordskey=5 \
+ -D mapreduce.randomtextwriter.minwordsvalue=20 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=20 \
+ -D mapreduce.output.fileoutputformat.compress=true \
-D mapred.map.output.compression.type=BLOCK \
- -outFormat org.apache.hadoop.mapred.TextOutputFormat \
+ -outFormat org.apache.hadoop.mapreduce.lib.output.TextOutputFormat \
${FIXCOMPTEXT} &
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large Sat Nov 28 20:26:01 2009
@@ -12,5 +12,5 @@
${HADOOP_HOME}/bin/hadoop dfs -rmr $OUTDIR
-${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapred.output.key.class=org.apache.hadoop.io.Text,mapred.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
+${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapreduce.job.output.key.class=org.apache.hadoop.io.Text,mapreduce.job.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium Sat Nov 28 20:26:01 2009
@@ -12,5 +12,5 @@
${HADOOP_HOME}/bin/hadoop dfs -rmr $OUTDIR
-${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapred.output.key.class=org.apache.hadoop.io.Text,mapred.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
+${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapreduce.job.output.key.class=org.apache.hadoop.io.Text,mapreduce.job.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small Sat Nov 28 20:26:01 2009
@@ -12,5 +12,5 @@
${HADOOP_HOME}/bin/hadoop dfs -rmr $OUTDIR
-${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapred.output.key.class=org.apache.hadoop.io.Text,mapred.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
+${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapreduce.job.output.key.class=org.apache.hadoop.io.Text,mapreduce.job.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh Sat Nov 28 20:26:01 2009
@@ -53,13 +53,13 @@
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
- -D test.randomtextwrite.min_words_key=5 \
- -D test.randomtextwrite.max_words_key=10 \
- -D test.randomtextwrite.min_words_value=100 \
- -D test.randomtextwrite.max_words_value=10000 \
- -D mapred.output.compress=true \
+ -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+ -D mapreduce.randomtextwriter.minwordskey=5 \
+ -D mapreduce.randomtextwriter.maxwordskey=10 \
+ -D mapreduce.randomtextwriter.minwordsvalue=100 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=10000 \
+ -D mapreduce.output.fileoutputformat.compress=true \
-D mapred.map.output.compression.type=BLOCK \
-outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
${VARCOMPSEQ} &
@@ -67,13 +67,13 @@
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
- -D test.randomtextwrite.min_words_key=5 \
- -D test.randomtextwrite.max_words_key=5 \
- -D test.randomtextwrite.min_words_value=100 \
- -D test.randomtextwrite.max_words_value=100 \
- -D mapred.output.compress=true \
+ -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+ -D mapreduce.randomtextwriter.minwordskey=5 \
+ -D mapreduce.randomtextwriter.maxwordskey=5 \
+ -D mapreduce.randomtextwriter.minwordsvalue=100 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=100 \
+ -D mapreduce.output.fileoutputformat.compress=true \
-D mapred.map.output.compression.type=BLOCK \
-outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
${FIXCOMPSEQ} &
@@ -81,13 +81,13 @@
${HADOOP_HOME}/bin/hadoop jar \
${EXAMPLE_JAR} randomtextwriter \
- -D test.randomtextwrite.total_bytes=${UNCOMPRESSED_DATA_BYTES} \
- -D test.randomtextwrite.bytes_per_map=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
- -D test.randomtextwrite.min_words_key=1 \
- -D test.randomtextwrite.max_words_key=10 \
- -D test.randomtextwrite.min_words_value=0 \
- -D test.randomtextwrite.max_words_value=200 \
- -D mapred.output.compress=false \
+ -D mapreduce.randomtextwriter.totalbytes=${UNCOMPRESSED_DATA_BYTES} \
+ -D mapreduce.randomtextwriter.bytespermap=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+ -D mapreduce.randomtextwriter.minwordskey=1 \
+ -D mapreduce.randomtextwriter.maxwordskey=10 \
+ -D mapreduce.randomtextwriter.minwordsvalue=0 \
+ -D mapreduce.randomtextwriter.maxwordsvalue=200 \
+ -D mapreduce.output.fileoutputformat.compress=false \
-outFormat org.apache.hadoop.mapreduce.lib.output.TextOutputFormat \
${VARINFLTEXT} &
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml Sat Nov 28 20:26:01 2009
@@ -39,7 +39,7 @@
<property>
<name>streamSort.smallJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
@@ -74,7 +74,7 @@
</property>
<property>
<name>streamSort.mediumJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -131,7 +131,7 @@
</property>
<property>
<name>javaSort.smallJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -160,7 +160,7 @@
</property>
<property>
<name>javaSort.mediumJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -217,7 +217,7 @@
</property>
<property>
<name>combiner.smallJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -246,7 +246,7 @@
</property>
<property>
<name>combiner.mediumJobs.inputFiles</name>
- <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -303,7 +303,7 @@
</property>
<property>
<name>monsterQuery.smallJobs.inputFiles</name>
- <value>${FIXCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+ <value>${FIXCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -332,7 +332,7 @@
</property>
<property>
<name>monsterQuery.mediumJobs.inputFiles</name>
- <value>${FIXCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${FIXCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -390,7 +390,7 @@
<property>
<name>webdataScan.smallJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -413,7 +413,7 @@
<property>
<name>webdataScan.mediumJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
@@ -469,7 +469,7 @@
</property>
<property>
<name>webdataSort.smallJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+ <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
<description></description>
</property>
<property>
@@ -498,7 +498,7 @@
</property>
<property>
<name>webdataSort.mediumJobs.inputFiles</name>
- <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+ <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
<description></description>
</property>
<property>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2 (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2 Sat Nov 28 20:26:01 2009
@@ -30,7 +30,7 @@
export HADOOP_CLASSPATH=${APP_JAR}:${EXAMPLE_JAR}:${STREAMING_JAR}
export LIBJARS=${APP_JAR},${EXAMPLE_JAR},${STREAMING_JAR}
-${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapred.GridMixRunner -libjars ${LIBJARS}
+${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapreduce.GridMixRunner -libjars ${LIBJARS}
Date=`date +%F-%H-%M-%S-%N`
echo $Date > $1_end.out
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/c++:713112
/hadoop/core/trunk/src/c++:776175-784663
-/hadoop/mapreduce/trunk/src/c++:804974-807678
+/hadoop/mapreduce/trunk/src/c++:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/pipes/impl/HadoopPipes.cc Sat Nov 28 20:26:01 2009
@@ -701,8 +701,8 @@
}
if (reducer != NULL) {
int64_t spillSize = 100;
- if (jobConf->hasKey("io.sort.mb")) {
- spillSize = jobConf->getInt("io.sort.mb");
+ if (jobConf->hasKey("mapreduce.task.io.sort.mb")) {
+ spillSize = jobConf->getInt("mapreduce.task.io.sort.mb");
}
writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer,
uplink, partitioner, numReduces);
@@ -937,7 +937,7 @@
*/
void* ping(void* ptr) {
TaskContextImpl* context = (TaskContextImpl*) ptr;
- char* portStr = getenv("hadoop.pipes.command.port");
+ char* portStr = getenv("mapreduce.pipes.command.port");
int MAX_RETRIES = 3;
int remaining_retries = MAX_RETRIES;
while (!context->isDone()) {
@@ -990,7 +990,7 @@
try {
TaskContextImpl* context = new TaskContextImpl(factory);
Protocol* connection;
- char* portStr = getenv("hadoop.pipes.command.port");
+ char* portStr = getenv("mapreduce.pipes.command.port");
int sock = -1;
FILE* stream = NULL;
FILE* outStream = NULL;
@@ -1024,8 +1024,8 @@
+ strerror(errno));
connection = new BinaryProtocol(stream, context, outStream);
- } else if (getenv("hadoop.pipes.command.file")) {
- char* filename = getenv("hadoop.pipes.command.file");
+ } else if (getenv("mapreduce.pipes.commandfile")) {
+ char* filename = getenv("mapreduce.pipes.commandfile");
string outFilename = filename;
outFilename += ".out";
stream = fopen(filename, "r");
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/main.c?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/main.c Sat Nov 28 20:26:01 2009
@@ -105,10 +105,16 @@
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
switch (command) {
+ case INITIALIZE_USER:
+ exit_code = initialize_user(user_detail->pw_name);
+ break;
case INITIALIZE_JOB:
job_id = argv[optind++];
exit_code = initialize_job(job_id, user_detail->pw_name);
break;
+ case INITIALIZE_DISTRIBUTEDCACHE:
+ exit_code = initialize_distributed_cache(user_detail->pw_name);
+ break;
case LAUNCH_TASK_JVM:
tt_root = argv[optind++];
job_id = argv[optind++];
@@ -129,6 +135,13 @@
task_pid = argv[optind++];
exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
break;
+ case RUN_DEBUG_SCRIPT:
+ tt_root = argv[optind++];
+ job_id = argv[optind++];
+ task_id = argv[optind++];
+ exit_code
+ = run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
+ break;
default:
exit_code = INVALID_COMMAND_PROVIDED;
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/task-controller.c?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.c Sat Nov 28 20:26:01 2009
@@ -120,16 +120,26 @@
/**
* Utility function to concatenate argB to argA using the concat_pattern
*/
-char *concatenate(const char *argA, const char *argB, char *concat_pattern,
- char *return_path_name) {
- if (argA == NULL || argB == NULL) {
- fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
- return_path_name);
- return NULL;
+char *concatenate(char *concat_pattern, char *return_path_name, int numArgs,
+ ...) {
+ va_list ap;
+ va_start(ap, numArgs);
+ int strlen_args = 0;
+ char *arg = NULL;
+ int j;
+ for (j = 0; j < numArgs; j++) {
+ arg = va_arg(ap, char*);
+ if (arg == NULL) {
+ fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+ return_path_name);
+ return NULL;
+ }
+ strlen_args += strlen(arg);
}
+ va_end(ap);
char *return_path = NULL;
- int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+ int str_len = strlen(concat_pattern) + strlen_args;
return_path = (char *) malloc(sizeof(char) * (str_len + 1));
if (return_path == NULL) {
@@ -137,47 +147,66 @@
return NULL;
}
memset(return_path, '\0', str_len + 1);
- snprintf(return_path, str_len, concat_pattern, argA, argB);
+ va_start(ap, numArgs);
+ vsnprintf(return_path, str_len, concat_pattern, ap);
+ va_end(ap);
return return_path;
}
/**
- * Get the job-directory path from tt_root and job-id
+ * Get the job-directory path from tt_root, user name and job-id
+ */
+char *get_job_directory(const char * tt_root, const char *user,
+ const char *jobid) {
+ return concatenate(TT_JOB_DIR_PATTERN, "job_dir_path", 3, tt_root, user,
+ jobid);
+}
+
+/**
+ * Get the user directory of a particular user
*/
-char *get_job_directory(const char * tt_root, const char *jobid) {
- return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+char *get_user_directory(const char *tt_root, const char *user) {
+ return concatenate(USER_DIR_PATTERN, "user_dir_path", 2, tt_root, user);
+}
+
+/**
+ * Get the distributed cache directory for a particular user
+ */
+char *get_distributed_cache_directory(const char *tt_root, const char *user) {
+ return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2,
+ tt_root, user);
}
char *get_job_work_directory(const char *job_dir) {
- return concatenate(job_dir, "", JOB_DIR_TO_JOB_WORK_PATTERN,
- "job_work_dir_path");
+ return concatenate(JOB_DIR_TO_JOB_WORK_PATTERN, "job_work_dir_path", 2,
+ job_dir, "");
}
/**
* Get the attempt directory for the given attempt_id
*/
char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
- return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
- "attempt_dir_path");
+ return concatenate(JOB_DIR_TO_ATTEMPT_DIR_PATTERN, "attempt_dir_path", 2,
+ job_dir, attempt_id);
}
/*
* Get the path to the task launcher file which is created by the TT
*/
char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) {
- return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN,
- "task_script_path");
+ return concatenate(TASK_SCRIPT_PATTERN, "task_script_path", 2, job_dir,
+ attempt_dir);
}
/**
* Get the log directory for the given attempt.
*/
char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
- return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
- "task_log_dir");
+ return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
+ attempt_id);
}
/**
- * Function to check if the passed tt_root is present in mapred.local.dir
+ * Function to check if the passed tt_root is present in mapreduce.cluster.local.dir
* the task-controller is configured with.
*/
int check_tt_root(const char *tt_root) {
@@ -332,8 +361,19 @@
if (!process_path) {
continue;
}
+ if (compare_ownership(uid, gid, entry->fts_path) == 0) {
+ // already set proper permissions.
+ // This might happen with distributed cache.
+#ifdef DEBUG
+ fprintf(
+ LOGFILE,
+ "already has private permissions. Not trying to change again for %s",
+ entry->fts_path);
+#endif
+ continue;
+ }
- if (check_ownership(getuid(), getgid(), entry->fts_path) != 0) {
+ if (check_ownership(entry->fts_path) != 0) {
fprintf(LOGFILE,
"Invalid file path. %s not user/group owned by the tasktracker.\n",
entry->fts_path);
@@ -359,8 +399,8 @@
* Function to prepare the attempt directories for the task JVM.
* This is done by changing the ownership of the attempt directory recursively
* to the job owner. We do the following:
- * * sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/
- * * sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/
+ * * sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid/$attemptid/
+ * * sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid/$attemptid/
*/
int prepare_attempt_directories(const char *job_id, const char *attempt_id,
const char *user) {
@@ -369,13 +409,13 @@
return INVALID_ARGUMENT_NUMBER;
}
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
if (get_user_details(user) < 0) {
fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
return INVALID_USER_NAME;
}
- int tasktracker_gid = getgid();
-
char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
if (local_dir == NULL) {
@@ -395,14 +435,14 @@
char **local_dir_ptr = local_dir;
int failed = 0;
while (*local_dir_ptr != NULL) {
- job_dir = get_job_directory(*local_dir_ptr, job_id);
+ job_dir = get_job_directory(*local_dir_ptr, user, job_id);
if (job_dir == NULL) {
fprintf(LOGFILE, "Couldn't get job directory for %s.\n", job_id);
failed = 1;
break;
}
- // prepare attempt-dir in each of the mapred_local_dir
+ // prepare attempt-dir in each of the mapreduce.cluster.local.dir
attempt_dir = get_attempt_directory(job_dir, attempt_id);
if (attempt_dir == NULL) {
fprintf(LOGFILE, "Couldn't get attempt directory for %s.\n", attempt_id);
@@ -485,7 +525,7 @@
}
}
- int tasktracker_gid = getgid();
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
| S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
// setgid on dirs but not files, 770. As of now, there are no files though
@@ -507,22 +547,122 @@
return 0;
}
+/**
+ * Compare ownership of a file with the given ids.
+ */
+int compare_ownership(uid_t uid, gid_t gid, char *path) {
+ struct stat filestat;
+ if (stat(path, &filestat) != 0) {
+ return UNABLE_TO_STAT_FILE;
+ }
+ if (uid == filestat.st_uid && gid == filestat.st_gid) {
+ return 0;
+ }
+ return 1;
+}
+
/*
- * Function to check if a user/group actually owns the file.
+ * Function to check if the TaskTracker actually owns the file.
*/
-int check_ownership(uid_t uid, gid_t gid, char *path) {
+int check_ownership(char *path) {
struct stat filestat;
if (stat(path, &filestat) != 0) {
return UNABLE_TO_STAT_FILE;
}
- // check user/group.
- if (uid != filestat.st_uid || gid != filestat.st_gid) {
+ // check user/group. User should be TaskTracker user, group can either be
+ // TaskTracker's primary group or the special group to which binary's
+ // permissions are set.
+ if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+ != filestat.st_gid)) {
return FILE_NOT_OWNED_BY_TASKTRACKER;
}
return 0;
}
/**
+ * Function to initialize the user directories of a user.
+ * It does the following:
+ * * sudo chown user:mapred -R taskTracker/$user
+ * * sudo chmod 2570 -R taskTracker/$user
+ * This is done once per every user on the TaskTracker.
+ */
+int initialize_user(const char *user) {
+
+ if (user == NULL) {
+ fprintf(LOGFILE, "user passed is null.\n");
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+ return INVALID_USER_NAME;
+ }
+
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return INVALID_TT_ROOT;
+ }
+
+ char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+#endif
+
+ char *user_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 0;
+ while (*local_dir_ptr != NULL) {
+ user_dir = get_user_directory(*local_dir_ptr, user);
+ if (user_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user);
+ failed = 1;
+ break;
+ }
+
+ struct stat filestat;
+ if (stat(user_dir, &filestat) != 0) {
+ if (errno == ENOENT) {
+#ifdef DEBUG
+ fprintf(LOGFILE, "user_dir %s doesn't exist. Not doing anything.\n",
+ user_dir);
+#endif
+ } else {
+ // stat failed because of something else!
+ fprintf(LOGFILE, "Failed to stat the user_dir %s\n",
+ user_dir);
+ failed = 1;
+ free(user_dir);
+ break;
+ }
+ } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid,
+ S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+ != 0) {
+ // No setgid on files and setgid on dirs, 570
+ fprintf(LOGFILE, "Failed to secure the user_dir %s\n",
+ user_dir);
+ failed = 1;
+ free(user_dir);
+ break;
+ }
+
+ local_dir_ptr++;
+ free(user_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+ cleanup();
+ if (failed) {
+ return INITIALIZE_USER_FAILED;
+ }
+ return 0;
+}
+
+/**
* Function to prepare the job directories for the task JVM.
* We do the following:
* * sudo chown user:mapred -R taskTracker/jobcache/$jobid
@@ -540,7 +680,7 @@
return INVALID_USER_NAME;
}
- gid_t tasktracker_gid = getgid(); // TaskTracker's group-id
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
if (local_dir == NULL) {
@@ -559,7 +699,7 @@
char **local_dir_ptr = local_dir;
int failed = 0;
while (*local_dir_ptr != NULL) {
- job_dir = get_job_directory(*local_dir_ptr, jobid);
+ job_dir = get_job_directory(*local_dir_ptr, user, jobid);
if (job_dir == NULL) {
fprintf(LOGFILE, "Couldn't get job directory for %s.\n", jobid);
failed = 1;
@@ -604,6 +744,7 @@
"job_work_dir %s doesn't exist. Not doing anything.\n",
job_work_dir);
#endif
+ free(job_work_dir);
} else {
// stat failed because of something else!
fprintf(LOGFILE, "Failed to stat the job_work_dir %s\n",
@@ -637,6 +778,97 @@
}
/**
+ * Function to initialize the distributed cache files of a user.
+ * It does the following:
+ * * sudo chown user:mapred -R taskTracker/$user/distcache
+ * * sudo chmod 2570 -R taskTracker/$user/distcache
+ * This is done once per every JVM launch. Tasks reusing JVMs just create
+ * symbolic links themselves and so there isn't anything specific to do in
+ * that case.
+ * Sometimes, it happens that a task uses the whole or part of a directory
+ * structure in taskTracker/$user/distcache. In this case, some paths are
+ * already set proper private permissions by this same function called during
+ * a previous JVM launch. In the current invocation, we only do the
+ * chown/chmod operation of files/directories that are newly created by the
+ * TaskTracker (i.e. those that still are not owned by user:mapred)
+ */
+int initialize_distributed_cache(const char *user) {
+
+ if (user == NULL) {
+ fprintf(LOGFILE, "user passed is null.\n");
+ return INVALID_ARGUMENT_NUMBER;
+ }
+
+ if (get_user_details(user) < 0) {
+ fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+ return INVALID_USER_NAME;
+ }
+
+ gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+ char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+ if (local_dir == NULL) {
+ fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+ cleanup();
+ return INVALID_TT_ROOT;
+ }
+
+ char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+ fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+ full_local_dir_str);
+#endif
+
+ char *distcache_dir;
+ char **local_dir_ptr = local_dir;
+ int failed = 0;
+ while (*local_dir_ptr != NULL) {
+ distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user);
+ if (distcache_dir == NULL) {
+ fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user);
+ failed = 1;
+ break;
+ }
+
+ struct stat filestat;
+ if (stat(distcache_dir, &filestat) != 0) {
+ if (errno == ENOENT) {
+#ifdef DEBUG
+ fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n",
+ distcache_dir);
+#endif
+ } else {
+ // stat failed because of something else!
+ fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n",
+ distcache_dir);
+ failed = 1;
+ free(distcache_dir);
+ break;
+ }
+ } else if (secure_path(distcache_dir, user_detail->pw_uid,
+ tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
+ | S_IXUSR | S_IRWXG) != 0) {
+ // No setgid on files and setgid on dirs, 570
+ fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
+ distcache_dir);
+ failed = 1;
+ free(distcache_dir);
+ break;
+ }
+
+ local_dir_ptr++;
+ free(distcache_dir);
+ }
+ free(local_dir);
+ free(full_local_dir_str);
+ cleanup();
+ if (failed) {
+ return INITIALIZE_DISTCACHE_FAILED;
+ }
+ return 0;
+}
+
+/**
* Function used to initialize task. Prepares attempt_dir, jars_dir and
* log_dir to be accessible by the child
*/
@@ -678,25 +910,41 @@
}
/*
- * Function used to launch a task as the provided user. It does the following :
- * 1) Checks if the tt_root passed is found in mapred.local.dir
- * 2) Prepares attempt_dir and log_dir to be accessible by the child
- * 3) Uses get_task_launcher_file to fetch the task script file path
- * 4) Does an execlp on the same in order to replace the current image with
- * task image.
+ * Function used to launch a task as the provided user.
*/
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
const char *tt_root) {
- int exit_code = 0;
+ return run_process_as_user(user, jobid, taskid, tt_root, LAUNCH_TASK_JVM);
+}
+/*
+ * Function that is used as a helper to launch task JVMs and debug scripts.
+ * Not meant for launching any other process. It does the following :
+ * 1) Checks if the tt_root passed is found in mapreduce.cluster.local.dir
+ * 2) Prepares attempt_dir and log_dir to be accessible by the task JVMs
+ * 3) Uses get_task_launcher_file to fetch the task script file path
+ * 4) Does an execlp on the same in order to replace the current image with
+ * task image.
+ */
+int run_process_as_user(const char * user, const char * jobid,
+const char *taskid, const char *tt_root, int command) {
+ if (command != LAUNCH_TASK_JVM && command != RUN_DEBUG_SCRIPT) {
+ return INVALID_COMMAND_PROVIDED;
+ }
if (jobid == NULL || taskid == NULL || tt_root == NULL) {
return INVALID_ARGUMENT_NUMBER;
}
+
+ if (command == LAUNCH_TASK_JVM) {
+ fprintf(LOGFILE, "run_process_as_user launching a JVM for task :%s.\n", taskid);
+ } else if (command == RUN_DEBUG_SCRIPT) {
+ fprintf(LOGFILE, "run_process_as_user launching a debug script for task :%s.\n", taskid);
+ }
#ifdef DEBUG
- fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
- fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
- fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
+ fprintf(LOGFILE, "Job-id passed to run_process_as_user : %s.\n", jobid);
+ fprintf(LOGFILE, "task-d passed to run_process_as_user : %s.\n", taskid);
+ fprintf(LOGFILE, "tt_root passed to run_process_as_user : %s.\n", tt_root);
#endif
//Check tt_root before switching the user, as reading configuration
@@ -707,15 +955,17 @@
return INVALID_TT_ROOT;
}
+ int exit_code = 0;
char *job_dir = NULL, *task_script_path = NULL;
- if ((exit_code = initialize_task(jobid, taskid, user)) != 0) {
+ if (command == LAUNCH_TASK_JVM &&
+ (exit_code = initialize_task(jobid, taskid, user)) != 0) {
fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid,
user);
goto cleanup;
}
- job_dir = get_job_directory(tt_root, jobid);
+ job_dir = get_job_directory(tt_root, user, jobid);
if (job_dir == NULL) {
fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
exit_code = OUT_OF_MEMORY;
@@ -748,9 +998,14 @@
cleanup();
execlp(task_script_path, task_script_path, NULL);
if (errno != 0) {
- fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
free(task_script_path);
- exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+ if (command == LAUNCH_TASK_JVM) {
+ fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
+ exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+ } else if (command == RUN_DEBUG_SCRIPT) {
+ fprintf(LOGFILE, "Couldn't execute the task debug script file: %s", strerror(errno));
+ exit_code = UNABLE_TO_EXECUTE_DEBUG_SCRIPT;
+ }
}
return exit_code;
@@ -766,7 +1021,13 @@
cleanup();
return exit_code;
}
-
+/*
+ * Function used to launch a debug script as the provided user.
+ */
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root) {
+ return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT);
+}
/**
* Function used to terminate/kill a task launched by the user.
* The function sends appropriate signal to the process group
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/task-controller.h?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.h Sat Nov 28 20:26:01 2009
@@ -37,11 +37,14 @@
//command definitions
enum command {
+ INITIALIZE_USER,
INITIALIZE_JOB,
+ INITIALIZE_DISTRIBUTEDCACHE,
LAUNCH_TASK_JVM,
INITIALIZE_TASK,
TERMINATE_TASK_JVM,
KILL_TASK_JVM,
+ RUN_DEBUG_SCRIPT,
};
enum errorcodes {
@@ -63,9 +66,16 @@
PREPARE_TASK_LOGS_FAILED, //16
INVALID_TT_LOG_DIR, //17
OUT_OF_MEMORY, //18
+ INITIALIZE_DISTCACHE_FAILED, //19
+ INITIALIZE_USER_FAILED, //20
+ UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
};
-#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+#define USER_DIR_PATTERN "%s/taskTracker/%s"
+
+#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
+
+#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache"
#define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
@@ -75,7 +85,7 @@
#define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
-#define TT_SYS_DIR_KEY "mapred.local.dir"
+#define TT_SYS_DIR_KEY "mapreduce.cluster.local.dir"
#define TT_LOG_DIR_KEY "hadoop.log.dir"
@@ -91,10 +101,17 @@
int run_task_as_user(const char * user, const char *jobid, const char *taskid,
const char *tt_root);
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+ const char *tt_root);
+
+int initialize_user(const char *user);
+
int initialize_task(const char *jobid, const char *taskid, const char *user);
int initialize_job(const char *jobid, const char *user);
+int initialize_distributed_cache(const char *user);
+
int kill_user_task(const char *user, const char *task_pid, int sig);
int prepare_attempt_directory(const char *attempt_dir, const char *user);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/tests/test-task-controller.c Sat Nov 28 20:26:01 2009
@@ -22,7 +22,7 @@
int write_config_file(char *file_name) {
FILE *file;
char const *str =
- "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+ "mapreduce.cluster.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
file = fopen(file_name, "w");
if (file == NULL) {
@@ -67,7 +67,7 @@
// Test obtaining a value for a key from the config
char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
"/tmp/testing3", "/tmp/testing4" };
- char *value = (char *) get_value("mapred.local.dir");
+ char *value = (char *) get_value("mapreduce.cluster.local.dir");
if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
!= 0) {
printf("Obtaining a value for a key from the config failed.\n");
@@ -75,7 +75,7 @@
}
// Test the parsing of a multiple valued key from the config
- char **values = (char **)get_values("mapred.local.dir");
+ char **values = (char **)get_values("mapreduce.cluster.local.dir");
char **values_ptr = values;
int i = 0;
while (*values_ptr != NULL) {
@@ -87,12 +87,12 @@
values_ptr++;
}
- if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) {
+ if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing5") == 0) {
printf("Configuration should not contain /tmp/testing5! \n");
goto cleanup;
}
- if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) {
+ if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing4") != 0) {
printf("Configuration should contain /tmp/testing4! \n");
goto cleanup;
}
@@ -111,11 +111,24 @@
rmdir(hadoop_conf_dir);
}
+void test_get_user_directory() {
+ char *user_dir = (char *) get_user_directory("/tmp", "user");
+ printf("user_dir obtained is %s\n", user_dir);
+ int ret = 0;
+ if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) {
+ ret = -1;
+ }
+ free(user_dir);
+ assert(ret == 0);
+}
+
void test_get_job_directory() {
- char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
printf("job_dir obtained is %s\n", job_dir);
int ret = 0;
- if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+ if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001")
+ != 0) {
ret = -1;
}
free(job_dir);
@@ -123,30 +136,34 @@
}
void test_get_attempt_directory() {
- char *attempt_dir = (char *) get_attempt_directory(
- "/tmp/taskTracker/jobcache/job_200906101234_0001",
- "attempt_200906112028_0001_m_000000_0");
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ printf("job_dir obtained is %s\n", job_dir);
+ char *attempt_dir = (char *) get_attempt_directory(job_dir,
+ "attempt_200906101234_0001_m_000000_0");
printf("attempt_dir obtained is %s\n", attempt_dir);
int ret = 0;
if (strcmp(
attempt_dir,
- "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+ "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0")
!= 0) {
ret = -1;
}
+ free(job_dir);
free(attempt_dir);
assert(ret == 0);
}
void test_get_task_launcher_file() {
- char *task_file = (char *) get_task_launcher_file(
- "/tmp/taskTracker/jobcache/job_200906101234_0001",
+ char *job_dir = (char *) get_job_directory("/tmp", "user",
+ "job_200906101234_0001");
+ char *task_file = (char *) get_task_launcher_file(job_dir,
"attempt_200906112028_0001_m_000000_0");
printf("task_file obtained is %s\n", task_file);
int ret = 0;
if (strcmp(
task_file,
- "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+ "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
!= 0) {
ret = -1;
}
@@ -168,13 +185,27 @@
}
int main(int argc, char **argv) {
- printf("Starting tests\n");
+ printf("\nStarting tests\n");
LOGFILE = stdout;
+
+ printf("\nTesting check_variable_against_config()\n");
test_check_variable_against_config();
+
+ printf("\nTesting get_user_directory()\n");
+ test_get_user_directory();
+
+ printf("\nTesting get_job_directory()\n");
test_get_job_directory();
+
+ printf("\nTesting get_attempt_directory()\n");
test_get_attempt_directory();
+
+ printf("\nTesting get_task_launcher_file()\n");
test_get_task_launcher_file();
+
+ printf("\nTesting get_task_log_dir()\n");
test_get_task_log_dir();
- printf("Finished tests\n");
+
+ printf("\nFinished tests\n");
return 0;
}
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
/hadoop/core/trunk/src/contrib:784664-785643
-/hadoop/mapreduce/trunk/src/contrib:804974-807678
+/hadoop/mapreduce/trunk/src/contrib:804974-884916
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/block_forensics/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/hdfs/src/contrib/block_forensics:713112
+/hadoop/mapreduce/trunk/src/contrib/block_forensics:807679-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml Sat Nov 28 20:26:01 2009
@@ -32,6 +32,7 @@
<property name="hadoop.root" location="${root}/../../../"/>
<property name="src.dir" location="${root}/src/java"/>
<property name="src.test" location="${root}/src/test"/>
+ <property name="src.test.data" location="${root}/src/test/data"/>
<property name="src.examples" location="${root}/src/examples"/>
<available file="${src.examples}" type="dir" property="examples.available"/>
@@ -45,6 +46,7 @@
<property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
<property name="build.classes" location="${build.dir}/classes"/>
<property name="build.test" location="${build.dir}/test"/>
+ <property name="test.build.extraconf" value="${build.test}/extraconf"/>
<property name="build.examples" location="${build.dir}/examples"/>
<property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
<!-- all jars together -->
@@ -56,6 +58,7 @@
value="http://java.sun.com/j2se/1.4/docs/api/"/>
<property name="build.encoding" value="ISO-8859-1"/>
+ <property name="dest.jar" value="hadoop-${version}-${name}.jar"/>
<fileset id="lib.jars" dir="${root}" includes="lib/*.jar"/>
@@ -68,8 +71,7 @@
<property name="ivy.jar" location="${hadoop.root}/ivy/ivy-${ivy.version}.jar"/>
<property name="ivy_repo_url"
value="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.version}/ivy-${ivy.version}.jar" />
- <property name="build.dir" location="build" />
- <property name="build.ivy.dir" location="${build.dir}/ivy" />
+ <property name="build.ivy.dir" location="${hadoop.root}/build/ivy" />
<property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
<property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
<property name="common.ivy.lib.dir" location="${build.ivy.lib.dir}/${ant.project.name}/common"/>
@@ -83,6 +85,7 @@
<pathelement location="${build.classes}"/>
<fileset refid="lib.jars"/>
<pathelement location="${hadoop.root}/build/classes"/>
+ <pathelement location="${hadoop.root}/build/tools"/>
<fileset dir="${hadoop.root}/lib">
<include name="**/*.jar" />
</fileset>
@@ -93,6 +96,7 @@
<!-- the unit test classpath -->
<path id="test.classpath">
<pathelement location="${build.test}" />
+ <pathelement location="${test.build.extraconf}" />
<pathelement location="${hadoop.root}/build/test/classes"/>
<pathelement location="${hadoop.root}/build/test/core/classes"/>
<pathelement location="${hadoop.root}/build/test/hdfs/classes"/>
@@ -118,6 +122,7 @@
<mkdir dir="${build.dir}"/>
<mkdir dir="${build.classes}"/>
<mkdir dir="${build.test}"/>
+ <mkdir dir="${build.test}/extraconf"/>
<mkdir dir="${build.examples}"/>
<mkdir dir="${hadoop.log.dir}"/>
<antcall target="init-contrib"/>
@@ -144,7 +149,7 @@
<!-- ======================================================= -->
<!-- Compile a Hadoop contrib's example files (if available) -->
<!-- ======================================================= -->
- <target name="compile-examples" depends="compile" if="examples.available">
+ <target name="compile-examples" depends="compile, ivy-retrieve-common" if="examples.available">
<echo message="contrib: ${name}"/>
<javac
encoding="${build.encoding}"
@@ -179,7 +184,7 @@
<target name="jar" depends="compile" unless="skip.contrib">
<echo message="contrib: ${name}"/>
<jar
- jarfile="${build.dir}/hadoop-${version}-${name}.jar"
+ jarfile="${build.dir}/${dest.jar}"
basedir="${build.classes}"
/>
</target>
@@ -206,7 +211,7 @@
<mkdir dir="${dist.dir}/contrib/${name}"/>
<copy todir="${dist.dir}/contrib/${name}" includeEmptyDirs="false" flatten="true">
<fileset dir="${build.dir}">
- <include name="hadoop-${version}-${name}.jar" />
+ <include name="${dest.jar}" />
</fileset>
</copy>
</target>
@@ -220,12 +225,14 @@
<mkdir dir="${hadoop.log.dir}"/>
<junit
printsummary="yes" showoutput="${test.output}"
- haltonfailure="no" fork="yes" maxmemory="256m"
+ haltonfailure="no" fork="yes" maxmemory="512m"
errorProperty="tests.failed" failureProperty="tests.failed"
timeout="${test.timeout}">
<sysproperty key="test.build.data" value="${build.test}/data"/>
<sysproperty key="build.test" value="${build.test}"/>
+ <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
+ <sysproperty key="src.test.data" value="${src.test.data}"/>
<sysproperty key="contrib.name" value="${name}"/>
<!-- requires fork=yes for:
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
/hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml Sat Nov 28 20:26:01 2009
@@ -56,6 +56,9 @@
<fileset dir="." includes="sqoop/build.xml"/>
<fileset dir="." includes="mrunit/build.xml"/>
<fileset dir="." includes="dynamic-scheduler/build.xml"/>
+ <fileset dir="." includes="gridmix/build.xml"/>
+ <fileset dir="." includes="vertica/build.xml"/>
+ <fileset dir="." includes="mumak/build.xml"/>
</subant>
<available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
<fail if="testsfailed">Tests failed!</fail>
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
/hadoop/core/trunk/src/contrib/build.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build.xml:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/build.xml:804974-884916
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
/hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,49 +24,35 @@
<artifact conf="master"/>
</publications>
<dependencies>
- <dependency org="commons-cli"
- name="commons-cli"
- rev="${commons-cli.version}"
- conf="common->default"/>
- <dependency org="commons-logging"
- name="commons-logging"
- rev="${commons-logging.version}"
- conf="common->default"/>
- <dependency org="junit"
- name="junit"
- rev="${junit.version}"
- conf="common->default"/>
- <dependency org="log4j"
- name="log4j"
- rev="${log4j.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jetty-util"
- rev="${jetty-util.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jetty"
- rev="${jetty.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jsp-api-2.1"
- rev="${jetty.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="jsp-2.1"
- rev="${jetty.version}"
- conf="common->master"/>
- <dependency org="org.mortbay.jetty"
- name="servlet-api-2.5"
- rev="${servlet-api-2.5.version}"
- conf="common->master"/>
- <dependency org="commons-httpclient"
- name="commons-httpclient"
- rev="${commons-httpclient.version}"
- conf="common->master"/>
- <dependency org="org.apache.hadoop"
- name="avro"
- rev="1.0.0"
- conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-core"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-core-test"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs"
+ rev="${hadoop-hdfs.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs-test"
+ rev="${hadoop-hdfs.version}" conf="common->default"/>
+ <dependency org="commons-cli" name="commons-cli"
+ rev="${commons-cli.version}" conf="common->default"/>
+ <dependency org="commons-logging" name="commons-logging"
+ rev="${commons-logging.version}" conf="common->default"/>
+ <dependency org="junit" name="junit"
+ rev="${junit.version}" conf="common->default"/>
+ <dependency org="log4j" name="log4j"
+ rev="${log4j.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jetty-util"
+ rev="${jetty-util.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jetty"
+ rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jsp-api-2.1"
+ rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jsp-2.1"
+ rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="servlet-api-2.5"
+ rev="${servlet-api-2.5.version}" conf="common->master"/>
+ <dependency org="commons-httpclient" name="commons-httpclient"
+ rev="${commons-httpclient.version}" conf="common->master"/>
+ <dependency org="org.apache.hadoop" name="avro"
+ rev="${avro.version}" conf="common->default"/>
</dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Sat Nov 28 20:26:01 2009
@@ -17,21 +17,44 @@
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
/**
- * Class providing access to resource manager configuration.
+ * Class providing access to Capacity scheduler configuration and default values
+ * for queue-configuration. Capacity scheduler configuration includes settings
+ * for the {@link JobInitializationPoller} and default values for queue
+ * configuration. These are read from the file
+ * {@link CapacitySchedulerConf#SCHEDULER_CONF_FILE} on the CLASSPATH. The main
+ * queue configuration is defined in the file
+ * {@link QueueManager#QUEUE_CONF_FILE_NAME} on the CLASSPATH.
+ *
+ * <p>
*
- * Resource manager configuration involves setting up queues, and defining
- * various properties for the queues. These are typically read from a file
- * called capacity-scheduler.xml that must be in the classpath of the
- * application. The class provides APIs to get/set and reload the
- * configuration for the queues.
+ * This class also provides APIs to get and set the configuration for the
+ * queues.
*/
class CapacitySchedulerConf {
-
- /** Default file name from which the resource manager configuration is read. */
+
+ static final Log LOG = LogFactory.getLog(CapacitySchedulerConf.class);
+
+ static final String CAPACITY_PROPERTY = "capacity";
+
+ static final String SUPPORTS_PRIORITY_PROPERTY = "supports-priority";
+
+ static final String MAXIMUM_INITIALIZED_JOBS_PER_USER_PROPERTY =
+ "maximum-initialized-jobs-per-user";
+
+ static final String MINIMUM_USER_LIMIT_PERCENT_PROPERTY =
+ "minimum-user-limit-percent";
+
+ /** Default file name from which the capacity scheduler configuration is read. */
public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
private int defaultUlimitMinimum;
@@ -41,6 +64,9 @@
private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX =
"mapred.capacity-scheduler.queue.";
+ private Map<String, Properties> queueProperties
+ = new HashMap<String,Properties>();
+
/**
* If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
* {@link JobConf#DISABLED_MEMORY_LIMIT}, this configuration will be used to
@@ -74,18 +100,12 @@
@Deprecated
static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
"mapred.capacity-scheduler.task.limit.maxpmem";
-
- /**
- * Configuration that provides the maximum cap for the map task in a queue
- * at any given point of time.
- */
- static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
-
+
/**
- * Configuration that provides the maximum cap for the reduce task in a queue
- * at any given point of time.
+ * A maximum capacity defines a limit beyond which a sub-queue
+ * cannot use the capacity of its parent queue.
*/
- static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
+ static final String MAX_CAPACITY_PROPERTY ="maximum-capacity";
/**
* The constant which defines the default initialization thread
@@ -102,29 +122,29 @@
private Configuration rmConf;
private int defaultMaxJobsPerUsersToInitialize;
-
+
/**
- * Create a new ResourceManagerConf.
+ * Create a new CapacitySchedulerConf.
* This method reads from the default configuration file mentioned in
- * {@link RM_CONF_FILE}, that must be present in the classpath of the
+ * {@link SCHEDULER_CONF_FILE}, that must be present in the classpath of the
* application.
*/
public CapacitySchedulerConf() {
rmConf = new Configuration(false);
- rmConf.addResource(SCHEDULER_CONF_FILE);
+ getCSConf().addResource(SCHEDULER_CONF_FILE);
initializeDefaults();
}
/**
- * Create a new ResourceManagerConf reading the specified configuration
+ * Create a new CapacitySchedulerConf reading the specified configuration
* file.
*
* @param configFile {@link Path} to the configuration file containing
- * the resource manager configuration.
+ * the Capacity scheduler configuration.
*/
public CapacitySchedulerConf(Path configFile) {
rmConf = new Configuration(false);
- rmConf.addResource(configFile);
+ getCSConf().addResource(configFile);
initializeDefaults();
}
@@ -133,14 +153,18 @@
* which is used by the Capacity Scheduler.
*/
private void initializeDefaults() {
- defaultUlimitMinimum = rmConf.getInt(
+ defaultUlimitMinimum = getCSConf().getInt(
"mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
- defaultSupportPriority = rmConf.getBoolean(
+ defaultSupportPriority = getCSConf().getBoolean(
"mapred.capacity-scheduler.default-supports-priority", false);
- defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
+ defaultMaxJobsPerUsersToInitialize = getCSConf().getInt(
"mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
2);
}
+
+ void setProperties(String queueName , Properties properties) {
+ this.queueProperties.put(queueName,properties);
+ }
/**
* Get the percentage of the cluster for the specified queue.
@@ -163,31 +187,60 @@
//In case of both capacity and default capacity not configured.
//Last check is if the configuration is specified and is marked as
//negative we throw exception
- String raw = rmConf.getRaw(toFullPropertyName(queue,
- "capacity"));
- if(raw == null) {
- return -1;
- }
- float result = rmConf.getFloat(toFullPropertyName(queue,
- "capacity"),
- -1);
- if (result < 0.0 || result > 100.0) {
+ String raw = getProperty(queue, CAPACITY_PROPERTY);
+
+ float result = this.getFloat(raw,-1);
+
+ if (result > 100.0) {
throw new IllegalArgumentException("Illegal capacity for queue " + queue +
" of " + result);
}
return result;
}
-
+
+ String getProperty(String queue,String property) {
+ if(!queueProperties.containsKey(queue))
+ throw new IllegalArgumentException("Invalid queuename " + queue);
+
+ //This check is still required as sometimes we create queue with null
+ //This is typically happens in case of test.
+ if(queueProperties.get(queue) != null) {
+ return queueProperties.get(queue).getProperty(property);
+ }
+
+ return null;
+ }
+
/**
- * Sets the capacity of the given queue.
+ * Return the maximum percentage of the cluster capacity that can be
+ * used by the given queue
+ * This percentage defines a limit beyond which a
+ * sub-queue cannot use the capacity of its parent queue.
+ * This provides a means to limit how much excess capacity a
+ * sub-queue can use. By default, there is no limit.
+ *
+ * The maximum-capacity-stretch of a queue can only be
+ * greater than or equal to its minimum capacity.
*
* @param queue name of the queue
- * @param capacity percent of the cluster for the queue.
+ * @return maximum capacity percent of cluster for the queue
*/
- public void setCapacity(String queue,float capacity) {
- rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
+ public float getMaxCapacity(String queue) {
+ String raw = getProperty(queue, MAX_CAPACITY_PROPERTY);
+ float result = getFloat(raw,-1);
+ result = (result <= 0) ? -1 : result;
+ if (result > 100.0) {
+ throw new IllegalArgumentException("Illegal maximum-capacity-stretch " +
+ "for queue " + queue +" of " + result);
+ }
+
+ if((result != -1) && (result < getCapacity(queue))) {
+ throw new IllegalArgumentException("maximum-capacity-stretch " +
+ "for a queue should be greater than capacity ");
+ }
+ return result;
}
-
+
/**
* Get whether priority is supported for this queue.
*
@@ -198,21 +251,10 @@
* @return Whether this queue supports priority or not.
*/
public boolean isPrioritySupported(String queue) {
- return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
- defaultSupportPriority);
- }
-
- /**
- * Set whether priority is supported for this queue.
- *
- *
- * @param queue name of the queue
- * @param value true, if the queue must support priorities, false otherwise.
- */
- public void setPrioritySupported(String queue, boolean value) {
- rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+ String raw = getProperty(queue, SUPPORTS_PRIORITY_PROPERTY);
+ return Boolean.parseBoolean(raw);
}
-
+
/**
* Get the minimum limit of resources for any user submitting jobs in
* this queue, in percentage.
@@ -229,8 +271,8 @@
*
*/
public int getMinimumUserLimitPercent(String queue) {
- int userLimit = rmConf.getInt(toFullPropertyName(queue,
- "minimum-user-limit-percent"), defaultUlimitMinimum);
+ String raw = getProperty(queue, MINIMUM_USER_LIMIT_PERCENT_PROPERTY);
+ int userLimit = getInt(raw,defaultUlimitMinimum);
if(userLimit <= 0 || userLimit > 100) {
throw new IllegalArgumentException("Invalid user limit : "
+ userLimit + " for queue : " + queue);
@@ -238,28 +280,6 @@
return userLimit;
}
- /**
- * Set the minimum limit of resources for any user submitting jobs in
- * this queue, in percentage.
- *
- * @param queue name of the queue
- * @param value minimum limit of resources for any user submitting jobs
- * in this queue
- */
- public void setMinimumUserLimitPercent(String queue, int value) {
- rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"),
- value);
- }
-
- /**
- * Reload configuration by clearing the information read from the
- * underlying configuration file.
- */
- public synchronized void reloadConfiguration() {
- rmConf.reloadConfiguration();
- initializeDefaults();
- }
-
static final String toFullPropertyName(String queue,
String property) {
return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
@@ -275,27 +295,16 @@
* or zero.
*/
public int getMaxJobsPerUserToInitialize(String queue) {
- int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
- "maximum-initialized-jobs-per-user"),
- defaultMaxJobsPerUsersToInitialize);
+ String raw =
+ getProperty(queue, MAXIMUM_INITIALIZED_JOBS_PER_USER_PROPERTY);
+ int maxJobsPerUser = getInt(raw,defaultMaxJobsPerUsersToInitialize);
if(maxJobsPerUser <= 0) {
throw new IllegalArgumentException(
"Invalid maximum jobs per user configuration " + maxJobsPerUser);
}
return maxJobsPerUser;
}
-
- /**
- * Sets the maximum number of jobs which are allowed to be initialized
- * for a user in the queue.
- *
- * @param queue queue name.
- * @param value maximum number of jobs allowed to be initialized per user.
- */
- public void setMaxJobsPerUserToInitialize(String queue, int value) {
- rmConf.setInt(toFullPropertyName(queue,
- "maximum-initialized-jobs-per-user"), value);
- }
+
/**
* Amount of time in milliseconds which poller thread and initialization
@@ -308,7 +317,7 @@
* @throws IllegalArgumentException if time is negative or zero.
*/
public long getSleepInterval() {
- long sleepInterval = rmConf.getLong(
+ long sleepInterval = getCSConf().getLong(
"mapred.capacity-scheduler.init-poll-interval",
INITIALIZATION_THREAD_POLLING_INTERVAL);
@@ -338,7 +347,7 @@
* in parallel.
*/
public int getMaxWorkerThreads() {
- int maxWorkerThreads = rmConf.getInt(
+ int maxWorkerThreads = getCSConf().getInt(
"mapred.capacity-scheduler.init-worker-threads",
MAX_INITIALIZATION_WORKER_THREADS);
if(maxWorkerThreads <= 0) {
@@ -347,62 +356,28 @@
}
return maxWorkerThreads;
}
- /**
- * Set the sleep interval which initialization poller would sleep before
- * it looks at the jobs in the job queue.
- *
- * @param interval sleep interval
- */
- public void setSleepInterval(long interval) {
- rmConf.setLong(
- "mapred.capacity-scheduler.init-poll-interval", interval);
- }
-
- /**
- * Sets number of threads which can be spawned to initialize jobs in
- * parallel.
- *
- * @param poolSize number of threads to be spawned to initialize jobs
- * in parallel.
- */
- public void setMaxWorkerThreads(int poolSize) {
- rmConf.setInt(
- "mapred.capacity-scheduler.init-worker-threads", poolSize);
- }
- /**
- * get the max map slots cap
- * @param queue
- * @return
- */
- public int getMaxMapCap(String queue) {
- return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
+ public Configuration getCSConf() {
+ return rmConf;
}
- /**
- * Used for testing
- * @param queue
- * @param val
- */
- public void setMaxMapCap(String queue,int val) {
- rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
- }
-
- /**
- * get the max reduce slots cap
- * @param queue
- * @return
- */
- public int getMaxReduceCap(String queue) {
- return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);
+ float getFloat(String valueString,float defaultValue) {
+ if (valueString == null)
+ return defaultValue;
+ try {
+ return Float.parseFloat(valueString);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
}
- /**
- * Used for testing
- * @param queue
- * @param val
- */
- public void setMaxReduceCap(String queue,int val) {
- rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
+ int getInt(String valueString,int defaultValue) {
+ if (valueString == null)
+ return defaultValue;
+ try {
+ return Integer.parseInt(valueString);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
}
}