You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [3/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/generateData.sh Sat Nov 28 20:26:01 2009
@@ -29,51 +29,51 @@
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
-  -D test.randomtextwrite.min_words_key=5 \
-  -D test.randomtextwrite.max_words_key=10 \
-  -D test.randomtextwrite.min_words_value=100 \
-  -D test.randomtextwrite.max_words_value=10000 \
-  -D mapred.output.compress=true \
+  -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+  -D mapreduce.randomtextwriter.minwordskey=5 \
+  -D mapreduce.randomtextwriter.maxwordskey=10 \
+  -D mapreduce.randomtextwriter.minwordsvalue=100 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=10000 \
+  -D mapreduce.output.fileoutputformat.compress=true \
   -D mapred.map.output.compression.type=BLOCK \
-  -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
+  -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
   ${VARCOMPSEQ} &
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
-  -D test.randomtextwrite.min_words_key=5 \
-  -D test.randomtextwrite.max_words_key=5 \
-  -D test.randomtextwrite.min_words_value=100 \
-  -D test.randomtextwrite.max_words_value=100 \
-  -D mapred.output.compress=true \
+  -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+  -D mapreduce.randomtextwriter.minwordskey=5 \
+  -D mapreduce.randomtextwriter.maxwordskey=5 \
+  -D mapreduce.randomtextwriter.minwordsvalue=100 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=100 \
+  -D mapreduce.output.fileoutputformat.compress=true \
   -D mapred.map.output.compression.type=BLOCK \
-  -outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
+  -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
   ${FIXCOMPSEQ} &
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${UNCOMPRESSED_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
-  -D test.randomtextwrite.min_words_key=1 \
-  -D test.randomtextwrite.max_words_key=10 \
-  -D test.randomtextwrite.min_words_value=0 \
-  -D test.randomtextwrite.max_words_value=200 \
-  -D mapred.output.compress=false \
-  -outFormat org.apache.hadoop.mapred.TextOutputFormat \
+  -D mapreduce.randomtextwriter.totalbytes=${UNCOMPRESSED_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+  -D mapreduce.randomtextwriter.minwordskey=1 \
+  -D mapreduce.randomtextwriter.maxwordskey=10 \
+  -D mapreduce.randomtextwriter.minwordsvalue=0 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=200 \
+  -D mapreduce.output.fileoutputformat.compress=false \
+  -outFormat org.apache.hadoop.mapreduce.lib.output.TextOutputFormat \
   ${VARINFLTEXT} &
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${INDIRECT_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${INDIRECT_DATA_BYTES} / ${INDIRECT_DATA_FILES})) \
-  -D test.randomtextwrite.min_words_key=5 \
-  -D test.randomtextwrite.max_words_key=5 \
-  -D test.randomtextwrite.min_words_value=20 \
-  -D test.randomtextwrite.max_words_value=20 \
-  -D mapred.output.compress=true \
+  -D mapreduce.randomtextwriter.totalbytes=${INDIRECT_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${INDIRECT_DATA_BYTES} / ${INDIRECT_DATA_FILES})) \
+  -D mapreduce.randomtextwriter.minwordskey=5 \
+  -D mapreduce.randomtextwriter.maxwordskey=5 \
+  -D mapreduce.randomtextwriter.minwordsvalue=20 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=20 \
+  -D mapreduce.output.fileoutputformat.compress=true \
   -D mapred.map.output.compression.type=BLOCK \
-  -outFormat org.apache.hadoop.mapred.TextOutputFormat \
+  -outFormat org.apache.hadoop.mapreduce.lib.output.TextOutputFormat \
   ${FIXCOMPTEXT} &

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.large Sat Nov 28 20:26:01 2009
@@ -12,5 +12,5 @@
 ${HADOOP_HOME}/bin/hadoop dfs -rmr $OUTDIR
 
 
-${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapred.output.key.class=org.apache.hadoop.io.Text,mapred.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
+${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapreduce.job.output.key.class=org.apache.hadoop.io.Text,mapreduce.job.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.medium Sat Nov 28 20:26:01 2009
@@ -12,5 +12,5 @@
 ${HADOOP_HOME}/bin/hadoop dfs -rmr $OUTDIR
 
 
-${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapred.output.key.class=org.apache.hadoop.io.Text,mapred.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
+${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapreduce.job.output.key.class=org.apache.hadoop.io.Text,mapreduce.job.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix/pipesort/text-sort.small Sat Nov 28 20:26:01 2009
@@ -12,5 +12,5 @@
 ${HADOOP_HOME}/bin/hadoop dfs -rmr $OUTDIR
 
 
-${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapred.output.key.class=org.apache.hadoop.io.Text,mapred.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
+${HADOOP_HOME}/bin/hadoop pipes -input $INDIR -output $OUTDIR -inputformat org.apache.hadoop.mapred.KeyValueTextInputFormat -program ${GRID_MIX_PROG}/pipes-sort -reduces $NUM_OF_REDUCERS -jobconf mapreduce.job.output.key.class=org.apache.hadoop.io.Text,mapreduce.job.output.value.class=org.apache.hadoop.io.Text -writer org.apache.hadoop.mapred.TextOutputFormat
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/generateGridmix2data.sh Sat Nov 28 20:26:01 2009
@@ -53,13 +53,13 @@
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
-  -D test.randomtextwrite.min_words_key=5 \
-  -D test.randomtextwrite.max_words_key=10 \
-  -D test.randomtextwrite.min_words_value=100 \
-  -D test.randomtextwrite.max_words_value=10000 \
-  -D mapred.output.compress=true \
+  -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+  -D mapreduce.randomtextwriter.minwordskey=5 \
+  -D mapreduce.randomtextwriter.maxwordskey=10 \
+  -D mapreduce.randomtextwriter.minwordsvalue=100 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=10000 \
+  -D mapreduce.output.fileoutputformat.compress=true \
   -D mapred.map.output.compression.type=BLOCK \
   -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
   ${VARCOMPSEQ} &
@@ -67,13 +67,13 @@
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${COMPRESSED_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
-  -D test.randomtextwrite.min_words_key=5 \
-  -D test.randomtextwrite.max_words_key=5 \
-  -D test.randomtextwrite.min_words_value=100 \
-  -D test.randomtextwrite.max_words_value=100 \
-  -D mapred.output.compress=true \
+  -D mapreduce.randomtextwriter.totalbytes=${COMPRESSED_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${COMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+  -D mapreduce.randomtextwriter.minwordskey=5 \
+  -D mapreduce.randomtextwriter.maxwordskey=5 \
+  -D mapreduce.randomtextwriter.minwordsvalue=100 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=100 \
+  -D mapreduce.output.fileoutputformat.compress=true \
   -D mapred.map.output.compression.type=BLOCK \
   -outFormat org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat \
   ${FIXCOMPSEQ} &
@@ -81,13 +81,13 @@
 
 ${HADOOP_HOME}/bin/hadoop jar \
   ${EXAMPLE_JAR} randomtextwriter \
-  -D test.randomtextwrite.total_bytes=${UNCOMPRESSED_DATA_BYTES} \
-  -D test.randomtextwrite.bytes_per_map=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
-  -D test.randomtextwrite.min_words_key=1 \
-  -D test.randomtextwrite.max_words_key=10 \
-  -D test.randomtextwrite.min_words_value=0 \
-  -D test.randomtextwrite.max_words_value=200 \
-  -D mapred.output.compress=false \
+  -D mapreduce.randomtextwriter.totalbytes=${UNCOMPRESSED_DATA_BYTES} \
+  -D mapreduce.randomtextwriter.bytespermap=$((${UNCOMPRESSED_DATA_BYTES} / ${NUM_MAPS})) \
+  -D mapreduce.randomtextwriter.minwordskey=1 \
+  -D mapreduce.randomtextwriter.maxwordskey=10 \
+  -D mapreduce.randomtextwriter.minwordsvalue=0 \
+  -D mapreduce.randomtextwriter.maxwordsvalue=200 \
+  -D mapreduce.output.fileoutputformat.compress=false \
   -outFormat org.apache.hadoop.mapreduce.lib.output.TextOutputFormat \
   ${VARINFLTEXT} &
 

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/gridmix_config.xml Sat Nov 28 20:26:01 2009
@@ -39,7 +39,7 @@
 
 <property>
   <name>streamSort.smallJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 
@@ -74,7 +74,7 @@
 </property>
 <property>
   <name>streamSort.mediumJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -131,7 +131,7 @@
 </property>
 <property>
   <name>javaSort.smallJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -160,7 +160,7 @@
 </property>
 <property>
   <name>javaSort.mediumJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -217,7 +217,7 @@
 </property>
 <property>
   <name>combiner.smallJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARINFLTEXT}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -246,7 +246,7 @@
 </property>
 <property>
   <name>combiner.mediumJobs.inputFiles</name>
-  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARINFLTEXT}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -303,7 +303,7 @@
 </property>
 <property>
   <name>monsterQuery.smallJobs.inputFiles</name>
-  <value>${FIXCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+  <value>${FIXCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -332,7 +332,7 @@
 </property>
 <property>
   <name>monsterQuery.mediumJobs.inputFiles</name>
-  <value>${FIXCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${FIXCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -390,7 +390,7 @@
 
 <property>
   <name>webdataScan.smallJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -413,7 +413,7 @@
 
 <property>
   <name>webdataScan.mediumJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>
@@ -469,7 +469,7 @@
 </property>
 <property>
   <name>webdataSort.smallJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-00000,part-00001,part-00002}</value>
+  <value>${VARCOMPSEQ}/{part-*-00000,part-*-00001,part-*-00002}</value>
   <description></description>
 </property>
 <property>
@@ -498,7 +498,7 @@
 </property>
 <property>
   <name>webdataSort.mediumJobs.inputFiles</name>
-  <value>${VARCOMPSEQ}/{part-000*0,part-000*1,part-000*2}</value>
+  <value>${VARCOMPSEQ}/{part-*-000*0,part-*-000*1,part-*-000*2}</value>
   <description></description>
 </property>
 <property>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2 (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/benchmarks/gridmix2/rungridmix_2 Sat Nov 28 20:26:01 2009
@@ -30,7 +30,7 @@
 
 export HADOOP_CLASSPATH=${APP_JAR}:${EXAMPLE_JAR}:${STREAMING_JAR}
 export LIBJARS=${APP_JAR},${EXAMPLE_JAR},${STREAMING_JAR}
-${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapred.GridMixRunner -libjars ${LIBJARS}
+${HADOOP_HOME}/bin/hadoop jar gridmix.jar org.apache.hadoop.mapreduce.GridMixRunner -libjars ${LIBJARS}
 
 Date=`date +%F-%H-%M-%S-%N`
 echo $Date >  $1_end.out

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++:713112
 /hadoop/core/trunk/src/c++:776175-784663
-/hadoop/mapreduce/trunk/src/c++:804974-807678
+/hadoop/mapreduce/trunk/src/c++:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/pipes/impl/HadoopPipes.cc
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/pipes/impl/HadoopPipes.cc?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/pipes/impl/HadoopPipes.cc (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/pipes/impl/HadoopPipes.cc Sat Nov 28 20:26:01 2009
@@ -701,8 +701,8 @@
       }
       if (reducer != NULL) {
         int64_t spillSize = 100;
-        if (jobConf->hasKey("io.sort.mb")) {
-          spillSize = jobConf->getInt("io.sort.mb");
+        if (jobConf->hasKey("mapreduce.task.io.sort.mb")) {
+          spillSize = jobConf->getInt("mapreduce.task.io.sort.mb");
         }
         writer = new CombineRunner(spillSize * 1024 * 1024, this, reducer, 
                                    uplink, partitioner, numReduces);
@@ -937,7 +937,7 @@
    */
   void* ping(void* ptr) {
     TaskContextImpl* context = (TaskContextImpl*) ptr;
-    char* portStr = getenv("hadoop.pipes.command.port");
+    char* portStr = getenv("mapreduce.pipes.command.port");
     int MAX_RETRIES = 3;
     int remaining_retries = MAX_RETRIES;
     while (!context->isDone()) {
@@ -990,7 +990,7 @@
     try {
       TaskContextImpl* context = new TaskContextImpl(factory);
       Protocol* connection;
-      char* portStr = getenv("hadoop.pipes.command.port");
+      char* portStr = getenv("mapreduce.pipes.command.port");
       int sock = -1;
       FILE* stream = NULL;
       FILE* outStream = NULL;
@@ -1024,8 +1024,8 @@
                                      + strerror(errno));
 
         connection = new BinaryProtocol(stream, context, outStream);
-      } else if (getenv("hadoop.pipes.command.file")) {
-        char* filename = getenv("hadoop.pipes.command.file");
+      } else if (getenv("mapreduce.pipes.commandfile")) {
+        char* filename = getenv("mapreduce.pipes.commandfile");
         string outFilename = filename;
         outFilename += ".out";
         stream = fopen(filename, "r");

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/main.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/main.c?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/main.c (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/main.c Sat Nov 28 20:26:01 2009
@@ -105,10 +105,16 @@
   fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
 
   switch (command) {
+  case INITIALIZE_USER:
+    exit_code = initialize_user(user_detail->pw_name);
+    break;
   case INITIALIZE_JOB:
     job_id = argv[optind++];
     exit_code = initialize_job(job_id, user_detail->pw_name);
     break;
+  case INITIALIZE_DISTRIBUTEDCACHE:
+    exit_code = initialize_distributed_cache(user_detail->pw_name);
+    break;
   case LAUNCH_TASK_JVM:
     tt_root = argv[optind++];
     job_id = argv[optind++];
@@ -129,6 +135,13 @@
     task_pid = argv[optind++];
     exit_code = kill_user_task(user_detail->pw_name, task_pid, SIGKILL);
     break;
+  case RUN_DEBUG_SCRIPT:
+    tt_root = argv[optind++];
+    job_id = argv[optind++];
+    task_id = argv[optind++];
+    exit_code
+        = run_debug_script_as_user(user_detail->pw_name, job_id, task_id, tt_root);
+    break;
   default:
     exit_code = INVALID_COMMAND_PROVIDED;
   }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/task-controller.c?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.c (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.c Sat Nov 28 20:26:01 2009
@@ -120,16 +120,26 @@
 /**
  * Utility function to concatenate argB to argA using the concat_pattern
  */
-char *concatenate(const char *argA, const char *argB, char *concat_pattern,
-    char *return_path_name) {
-  if (argA == NULL || argB == NULL) {
-    fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
-        return_path_name);
-    return NULL;
+char *concatenate(char *concat_pattern, char *return_path_name, int numArgs,
+    ...) {
+  va_list ap;
+  va_start(ap, numArgs);
+  int strlen_args = 0;
+  char *arg = NULL;
+  int j;
+  for (j = 0; j < numArgs; j++) {
+    arg = va_arg(ap, char*);
+    if (arg == NULL) {
+      fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+          return_path_name);
+      return NULL;
+    }
+    strlen_args += strlen(arg);
   }
+  va_end(ap);
 
   char *return_path = NULL;
-  int str_len = strlen(concat_pattern) + strlen(argA) + strlen(argB);
+  int str_len = strlen(concat_pattern) + strlen_args;
 
   return_path = (char *) malloc(sizeof(char) * (str_len + 1));
   if (return_path == NULL) {
@@ -137,47 +147,66 @@
     return NULL;
   }
   memset(return_path, '\0', str_len + 1);
-  snprintf(return_path, str_len, concat_pattern, argA, argB);
+  va_start(ap, numArgs);
+  vsnprintf(return_path, str_len, concat_pattern, ap);
+  va_end(ap);
   return return_path;
 }
 
 /**
- * Get the job-directory path from tt_root and job-id
+ * Get the job-directory path from tt_root, user name and job-id
+ */
+char *get_job_directory(const char * tt_root, const char *user,
+    const char *jobid) {
+  return concatenate(TT_JOB_DIR_PATTERN, "job_dir_path", 3, tt_root, user,
+      jobid);
+}
+
+/**
+ * Get the user directory of a particular user
  */
-char *get_job_directory(const char * tt_root, const char *jobid) {
-  return concatenate(tt_root, jobid, TT_JOB_DIR_PATTERN, "job_dir_path");
+char *get_user_directory(const char *tt_root, const char *user) {
+  return concatenate(USER_DIR_PATTERN, "user_dir_path", 2, tt_root, user);
+}
+
+/**
+ * Get the distributed cache directory for a particular user
+ */
+char *get_distributed_cache_directory(const char *tt_root, const char *user) {
+  return concatenate(USER_DISTRIBUTED_CACHE_DIR_PATTERN, "dist_cache_path", 2,
+      tt_root, user);
 }
 
 char *get_job_work_directory(const char *job_dir) {
-  return concatenate(job_dir, "", JOB_DIR_TO_JOB_WORK_PATTERN,
-      "job_work_dir_path");
+  return concatenate(JOB_DIR_TO_JOB_WORK_PATTERN, "job_work_dir_path", 2,
+      job_dir, "");
 }
 /**
  * Get the attempt directory for the given attempt_id
  */
 char *get_attempt_directory(const char *job_dir, const char *attempt_id) {
-  return concatenate(job_dir, attempt_id, JOB_DIR_TO_ATTEMPT_DIR_PATTERN,
-      "attempt_dir_path");
+  return concatenate(JOB_DIR_TO_ATTEMPT_DIR_PATTERN, "attempt_dir_path", 2,
+      job_dir, attempt_id);
 }
 
 /*
  * Get the path to the task launcher file which is created by the TT
  */
 char *get_task_launcher_file(const char *job_dir, const char *attempt_dir) {
-  return concatenate(job_dir, attempt_dir, TASK_SCRIPT_PATTERN,
-      "task_script_path");
+  return concatenate(TASK_SCRIPT_PATTERN, "task_script_path", 2, job_dir,
+      attempt_dir);
 }
 
 /**
  * Get the log directory for the given attempt.
  */
 char *get_task_log_dir(const char *log_dir, const char *attempt_id) {
-  return concatenate(log_dir, attempt_id, ATTEMPT_LOG_DIR_PATTERN,
-      "task_log_dir");
+  return concatenate(ATTEMPT_LOG_DIR_PATTERN, "task_log_dir", 2, log_dir,
+      attempt_id);
 }
 
 /**
- * Function to check if the passed tt_root is present in mapred.local.dir
+ * Function to check if the passed tt_root is present in mapreduce.cluster.local.dir
  * the task-controller is configured with.
  */
 int check_tt_root(const char *tt_root) {
@@ -332,8 +361,19 @@
     if (!process_path) {
       continue;
     }
+    if (compare_ownership(uid, gid, entry->fts_path) == 0) {
+      // already set proper permissions.
+      // This might happen with distributed cache.
+#ifdef DEBUG
+      fprintf(
+          LOGFILE,
+          "already has private permissions. Not trying to change again for %s",
+          entry->fts_path);
+#endif
+      continue;
+    }
 
-    if (check_ownership(getuid(), getgid(), entry->fts_path) != 0) {
+    if (check_ownership(entry->fts_path) != 0) {
       fprintf(LOGFILE,
           "Invalid file path. %s not user/group owned by the tasktracker.\n",
           entry->fts_path);
@@ -359,8 +399,8 @@
  * Function to prepare the attempt directories for the task JVM.
  * This is done by changing the ownership of the attempt directory recursively
  * to the job owner. We do the following:
- *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid/$attemptid/
- *     *  sudo chmod 2770 -R taskTracker/jobcache/$jobid/$attemptid/
+ *  *  sudo chown user:mapred -R taskTracker/$user/jobcache/$jobid/$attemptid/
+ *  *  sudo chmod 2770 -R taskTracker/$user/jobcache/$jobid/$attemptid/
  */
 int prepare_attempt_directories(const char *job_id, const char *attempt_id,
     const char *user) {
@@ -369,13 +409,13 @@
     return INVALID_ARGUMENT_NUMBER;
   }
 
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
   if (get_user_details(user) < 0) {
     fprintf(LOGFILE, "Couldn't get the user details of %s.\n", user);
     return INVALID_USER_NAME;
   }
 
-  int tasktracker_gid = getgid();
-
   char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
 
   if (local_dir == NULL) {
@@ -395,14 +435,14 @@
   char **local_dir_ptr = local_dir;
   int failed = 0;
   while (*local_dir_ptr != NULL) {
-    job_dir = get_job_directory(*local_dir_ptr, job_id);
+    job_dir = get_job_directory(*local_dir_ptr, user, job_id);
     if (job_dir == NULL) {
       fprintf(LOGFILE, "Couldn't get job directory for %s.\n", job_id);
       failed = 1;
       break;
     }
 
-    // prepare attempt-dir in each of the mapred_local_dir
+    // prepare attempt-dir in each of the mapreduce.cluster.local.dir
     attempt_dir = get_attempt_directory(job_dir, attempt_id);
     if (attempt_dir == NULL) {
       fprintf(LOGFILE, "Couldn't get attempt directory for %s.\n", attempt_id);
@@ -485,7 +525,7 @@
     }
   }
 
-  int tasktracker_gid = getgid();
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
   if (secure_path(task_log_dir, user_detail->pw_uid, tasktracker_gid, S_IRWXU
       | S_IRWXG, S_ISGID | S_IRWXU | S_IRWXG) != 0) {
     // setgid on dirs but not files, 770. As of now, there are no files though
@@ -507,22 +547,122 @@
   return 0;
 }
 
+/**
+ * Compare ownership of a file with the given ids.
+ */
+int compare_ownership(uid_t uid, gid_t gid, char *path) {
+  struct stat filestat;
+  if (stat(path, &filestat) != 0) {
+    return UNABLE_TO_STAT_FILE;
+  }
+  if (uid == filestat.st_uid && gid == filestat.st_gid) {
+    return 0;
+  }
+  return 1;
+}
+
 /*
- * Function to check if a user/group actually owns the file.
+ * Function to check if the TaskTracker actually owns the file.
   */
-int check_ownership(uid_t uid, gid_t gid, char *path) {
+int check_ownership(char *path) {
   struct stat filestat;
   if (stat(path, &filestat) != 0) {
     return UNABLE_TO_STAT_FILE;
   }
-  // check user/group.
-  if (uid != filestat.st_uid || gid != filestat.st_gid) {
+  // check user/group. User should be TaskTracker user, group can either be
+  // TaskTracker's primary group or the special group to which binary's
+  // permissions are set.
+  if (getuid() != filestat.st_uid || (getgid() != filestat.st_gid && getegid()
+      != filestat.st_gid)) {
     return FILE_NOT_OWNED_BY_TASKTRACKER;
   }
   return 0;
 }
 
 /**
+ * Function to initialize the user directories of a user.
+ * It does the following:
+ *     *  sudo chown user:mapred -R taskTracker/$user
+ *     *  sudo chmod 2570 -R taskTracker/$user
+ * This is done once per every user on the TaskTracker.
+ */
+int initialize_user(const char *user) {
+
+  if (user == NULL) {
+    fprintf(LOGFILE, "user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+    return INVALID_USER_NAME;
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
+#endif
+
+  char *user_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    user_dir = get_user_directory(*local_dir_ptr, user);
+    if (user_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get userdir directory for %s.\n", user);
+      failed = 1;
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(user_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE, "user_dir %s doesn't exist. Not doing anything.\n",
+            user_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the user_dir %s\n",
+            user_dir);
+        failed = 1;
+        free(user_dir);
+        break;
+      }
+    } else if (secure_path(user_dir, user_detail->pw_uid, tasktracker_gid,
+        S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR | S_IXUSR | S_IRWXG)
+        != 0) {
+      // No setgid on files and setgid on dirs, 570
+      fprintf(LOGFILE, "Failed to secure the user_dir %s\n",
+          user_dir);
+      failed = 1;
+      free(user_dir);
+      break;
+    }
+
+    local_dir_ptr++;
+    free(user_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+  cleanup();
+  if (failed) {
+    return INITIALIZE_USER_FAILED;
+  }
+  return 0;
+}
+
+/**
  * Function to prepare the job directories for the task JVM.
  * We do the following:
  *     *  sudo chown user:mapred -R taskTracker/jobcache/$jobid
@@ -540,7 +680,7 @@
     return INVALID_USER_NAME;
   }
 
-  gid_t tasktracker_gid = getgid(); // TaskTracker's group-id
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
 
   char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
   if (local_dir == NULL) {
@@ -559,7 +699,7 @@
   char **local_dir_ptr = local_dir;
   int failed = 0;
   while (*local_dir_ptr != NULL) {
-    job_dir = get_job_directory(*local_dir_ptr, jobid);
+    job_dir = get_job_directory(*local_dir_ptr, user, jobid);
     if (job_dir == NULL) {
       fprintf(LOGFILE, "Couldn't get job directory for %s.\n", jobid);
       failed = 1;
@@ -604,6 +744,7 @@
               "job_work_dir %s doesn't exist. Not doing anything.\n",
               job_work_dir);
 #endif
+          free(job_work_dir);
         } else {
           // stat failed because of something else!
           fprintf(LOGFILE, "Failed to stat the job_work_dir %s\n",
@@ -637,6 +778,97 @@
 }
 
 /**
+ * Function to initialize the distributed cache files of a user.
+ * It does the following:
+ *     *  sudo chown user:mapred -R taskTracker/$user/distcache
+ *     *  sudo chmod 2570 -R taskTracker/$user/distcache
+ * This is done once per every JVM launch. Tasks reusing JVMs just create
+ * symbolic links themselves and so there isn't anything specific to do in
+ * that case.
+ * Sometimes, it happens that a task uses the whole or part of a directory
+ * structure in taskTracker/$user/distcache. In this case, some paths are
+ * already set proper private permissions by this same function called during
+ * a previous JVM launch. In the current invocation, we only do the
+ * chown/chmod operation of files/directories that are newly created by the
+ * TaskTracker (i.e. those that still are not owned by user:mapred)
+ */
+int initialize_distributed_cache(const char *user) {
+
+  if (user == NULL) {
+    fprintf(LOGFILE, "user passed is null.\n");
+    return INVALID_ARGUMENT_NUMBER;
+  }
+
+  if (get_user_details(user) < 0) {
+    fprintf(LOGFILE, "Couldn't get the user details of %s", user);
+    return INVALID_USER_NAME;
+  }
+
+  gid_t tasktracker_gid = getegid(); // the group permissions of the binary.
+
+  char **local_dir = (char **) get_values(TT_SYS_DIR_KEY);
+  if (local_dir == NULL) {
+    fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY);
+    cleanup();
+    return INVALID_TT_ROOT;
+  }
+
+  char *full_local_dir_str = (char *) get_value(TT_SYS_DIR_KEY);
+#ifdef DEBUG
+  fprintf(LOGFILE, "Value from config for %s is %s.\n", TT_SYS_DIR_KEY,
+      full_local_dir_str);
+#endif
+
+  char *distcache_dir;
+  char **local_dir_ptr = local_dir;
+  int failed = 0;
+  while (*local_dir_ptr != NULL) {
+    distcache_dir = get_distributed_cache_directory(*local_dir_ptr, user);
+    if (distcache_dir == NULL) {
+      fprintf(LOGFILE, "Couldn't get distcache directory for %s.\n", user);
+      failed = 1;
+      break;
+    }
+
+    struct stat filestat;
+    if (stat(distcache_dir, &filestat) != 0) {
+      if (errno == ENOENT) {
+#ifdef DEBUG
+        fprintf(LOGFILE, "distcache_dir %s doesn't exist. Not doing anything.\n",
+            distcache_dir);
+#endif
+      } else {
+        // stat failed because of something else!
+        fprintf(LOGFILE, "Failed to stat the distcache_dir %s\n",
+            distcache_dir);
+        failed = 1;
+        free(distcache_dir);
+        break;
+      }
+    } else if (secure_path(distcache_dir, user_detail->pw_uid,
+        tasktracker_gid, S_IRUSR | S_IXUSR | S_IRWXG, S_ISGID | S_IRUSR
+            | S_IXUSR | S_IRWXG) != 0) {
+      // No setgid on files and setgid on dirs, 570
+      fprintf(LOGFILE, "Failed to secure the distcache_dir %s\n",
+          distcache_dir);
+      failed = 1;
+      free(distcache_dir);
+      break;
+    }
+
+    local_dir_ptr++;
+    free(distcache_dir);
+  }
+  free(local_dir);
+  free(full_local_dir_str);
+  cleanup();
+  if (failed) {
+    return INITIALIZE_DISTCACHE_FAILED;
+  }
+  return 0;
+}
+
+/**
  * Function used to initialize task. Prepares attempt_dir, jars_dir and
  * log_dir to be accessible by the child
  */
@@ -678,25 +910,41 @@
 }
 
 /*
- * Function used to launch a task as the provided user. It does the following :
- * 1) Checks if the tt_root passed is found in mapred.local.dir
- * 2) Prepares attempt_dir and log_dir to be accessible by the child
- * 3) Uses get_task_launcher_file to fetch the task script file path
- * 4) Does an execlp on the same in order to replace the current image with
- * task image.
+ * Function used to launch a task as the provided user.
  */
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root) {
-  int exit_code = 0;
+  return run_process_as_user(user, jobid, taskid, tt_root, LAUNCH_TASK_JVM);
+}
 
+/*
+ * Function that is used as a helper to launch task JVMs and debug scripts.
+ * Not meant for launching any other process. It does the following :
+ * 1) Checks if the tt_root passed is found in mapreduce.cluster.local.dir
+ * 2) Prepares attempt_dir and log_dir to be accessible by the task JVMs
+ * 3) Uses get_task_launcher_file to fetch the task script file path
+ * 4) Does an execlp on the same in order to replace the current image with
+ * task image.
+ */
+int run_process_as_user(const char * user, const char * jobid, 
+const char *taskid, const char *tt_root, int command) {
+  if (command != LAUNCH_TASK_JVM && command != RUN_DEBUG_SCRIPT) {
+    return INVALID_COMMAND_PROVIDED;
+  }
   if (jobid == NULL || taskid == NULL || tt_root == NULL) {
     return INVALID_ARGUMENT_NUMBER;
   }
+  
+  if (command == LAUNCH_TASK_JVM) {
+    fprintf(LOGFILE, "run_process_as_user launching a JVM for task :%s.\n", taskid);
+  } else if (command == RUN_DEBUG_SCRIPT) {
+    fprintf(LOGFILE, "run_process_as_user launching a debug script for task :%s.\n", taskid);
+  }
 
 #ifdef DEBUG
-  fprintf(LOGFILE, "Job-id passed to run_task_as_user : %s.\n", jobid);
-  fprintf(LOGFILE, "task-d passed to run_task_as_user : %s.\n", taskid);
-  fprintf(LOGFILE, "tt_root passed to run_task_as_user : %s.\n", tt_root);
+  fprintf(LOGFILE, "Job-id passed to run_process_as_user : %s.\n", jobid);
+  fprintf(LOGFILE, "task-d passed to run_process_as_user : %s.\n", taskid);
+  fprintf(LOGFILE, "tt_root passed to run_process_as_user : %s.\n", tt_root);
 #endif
 
   //Check tt_root before switching the user, as reading configuration
@@ -707,15 +955,17 @@
     return INVALID_TT_ROOT;
   }
 
+  int exit_code = 0;
   char *job_dir = NULL, *task_script_path = NULL;
 
-  if ((exit_code = initialize_task(jobid, taskid, user)) != 0) {
+  if (command == LAUNCH_TASK_JVM && 
+     (exit_code = initialize_task(jobid, taskid, user)) != 0) {
     fprintf(LOGFILE, "Couldn't initialise the task %s of user %s.\n", taskid,
         user);
     goto cleanup;
   }
 
-  job_dir = get_job_directory(tt_root, jobid);
+  job_dir = get_job_directory(tt_root, user, jobid);
   if (job_dir == NULL) {
     fprintf(LOGFILE, "Couldn't obtain job_dir for %s in %s.\n", jobid, tt_root);
     exit_code = OUT_OF_MEMORY;
@@ -748,9 +998,14 @@
   cleanup();
   execlp(task_script_path, task_script_path, NULL);
   if (errno != 0) {
-    fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
     free(task_script_path);
-    exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+    if (command == LAUNCH_TASK_JVM) {
+      fprintf(LOGFILE, "Couldn't execute the task jvm file: %s", strerror(errno));
+      exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT;
+    } else if (command == RUN_DEBUG_SCRIPT) {
+      fprintf(LOGFILE, "Couldn't execute the task debug script file: %s", strerror(errno));
+      exit_code = UNABLE_TO_EXECUTE_DEBUG_SCRIPT;
+    }
   }
 
   return exit_code;
@@ -766,7 +1021,13 @@
   cleanup();
   return exit_code;
 }
-
+/*
+ * Function used to launch a debug script as the provided user. 
+ */
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root) {
+  return run_process_as_user(user, jobid, taskid, tt_root, RUN_DEBUG_SCRIPT);
+}
 /**
  * Function used to terminate/kill a task launched by the user.
  * The function sends appropriate signal to the process group

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.h
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/task-controller.h?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.h (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/task-controller.h Sat Nov 28 20:26:01 2009
@@ -37,11 +37,14 @@
 
 //command definitions
 enum command {
+  INITIALIZE_USER,
   INITIALIZE_JOB,
+  INITIALIZE_DISTRIBUTEDCACHE,
   LAUNCH_TASK_JVM,
   INITIALIZE_TASK,
   TERMINATE_TASK_JVM,
   KILL_TASK_JVM,
+  RUN_DEBUG_SCRIPT,
 };
 
 enum errorcodes {
@@ -63,9 +66,16 @@
   PREPARE_TASK_LOGS_FAILED, //16
   INVALID_TT_LOG_DIR, //17
   OUT_OF_MEMORY, //18
+  INITIALIZE_DISTCACHE_FAILED, //19
+  INITIALIZE_USER_FAILED, //20
+  UNABLE_TO_EXECUTE_DEBUG_SCRIPT, //21
 };
 
-#define TT_JOB_DIR_PATTERN "%s/taskTracker/jobcache/%s"
+#define USER_DIR_PATTERN "%s/taskTracker/%s"
+
+#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN"/jobcache/%s"
+
+#define USER_DISTRIBUTED_CACHE_DIR_PATTERN USER_DIR_PATTERN"/distcache"
 
 #define JOB_DIR_TO_JOB_WORK_PATTERN "%s/work"
 
@@ -75,7 +85,7 @@
 
 #define TASK_SCRIPT_PATTERN "%s/%s/taskjvm.sh"
 
-#define TT_SYS_DIR_KEY "mapred.local.dir"
+#define TT_SYS_DIR_KEY "mapreduce.cluster.local.dir"
 
 #define TT_LOG_DIR_KEY "hadoop.log.dir"
 
@@ -91,10 +101,17 @@
 int run_task_as_user(const char * user, const char *jobid, const char *taskid,
     const char *tt_root);
 
+int run_debug_script_as_user(const char * user, const char *jobid, const char *taskid,
+    const char *tt_root);
+
+int initialize_user(const char *user);
+
 int initialize_task(const char *jobid, const char *taskid, const char *user);
 
 int initialize_job(const char *jobid, const char *user);
 
+int initialize_distributed_cache(const char *user);
+
 int kill_user_task(const char *user, const char *task_pid, int sig);
 
 int prepare_attempt_directory(const char *attempt_dir, const char *user);

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/tests/test-task-controller.c
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/c%2B%2B/task-controller/tests/test-task-controller.c?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/tests/test-task-controller.c (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/c++/task-controller/tests/test-task-controller.c Sat Nov 28 20:26:01 2009
@@ -22,7 +22,7 @@
 int write_config_file(char *file_name) {
   FILE *file;
   char const *str =
-      "mapred.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
+      "mapreduce.cluster.local.dir=/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4\n";
 
   file = fopen(file_name, "w");
   if (file == NULL) {
@@ -67,7 +67,7 @@
   // Test obtaining a value for a key from the config
   char *config_values[4] = { "/tmp/testing1", "/tmp/testing2",
       "/tmp/testing3", "/tmp/testing4" };
-  char *value = (char *) get_value("mapred.local.dir");
+  char *value = (char *) get_value("mapreduce.cluster.local.dir");
   if (strcmp(value, "/tmp/testing1,/tmp/testing2,/tmp/testing3,/tmp/testing4")
       != 0) {
     printf("Obtaining a value for a key from the config failed.\n");
@@ -75,7 +75,7 @@
   }
 
   // Test the parsing of a multiple valued key from the config
-  char **values = (char **)get_values("mapred.local.dir");
+  char **values = (char **)get_values("mapreduce.cluster.local.dir");
   char **values_ptr = values;
   int i = 0;
   while (*values_ptr != NULL) {
@@ -87,12 +87,12 @@
     values_ptr++;
   }
 
-  if (check_variable_against_config("mapred.local.dir", "/tmp/testing5") == 0) {
+  if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing5") == 0) {
     printf("Configuration should not contain /tmp/testing5! \n");
     goto cleanup;
   }
 
-  if (check_variable_against_config("mapred.local.dir", "/tmp/testing4") != 0) {
+  if (check_variable_against_config("mapreduce.cluster.local.dir", "/tmp/testing4") != 0) {
     printf("Configuration should contain /tmp/testing4! \n");
     goto cleanup;
   }
@@ -111,11 +111,24 @@
   rmdir(hadoop_conf_dir);
 }
 
+void test_get_user_directory() {
+  char *user_dir = (char *) get_user_directory("/tmp", "user");
+  printf("user_dir obtained is %s\n", user_dir);
+  int ret = 0;
+  if (strcmp(user_dir, "/tmp/taskTracker/user") != 0) {
+    ret = -1;
+  }
+  free(user_dir);
+  assert(ret == 0);
+}
+
 void test_get_job_directory() {
-  char *job_dir = (char *) get_job_directory("/tmp", "job_200906101234_0001");
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
   printf("job_dir obtained is %s\n", job_dir);
   int ret = 0;
-  if (strcmp(job_dir, "/tmp/taskTracker/jobcache/job_200906101234_0001") != 0) {
+  if (strcmp(job_dir, "/tmp/taskTracker/user/jobcache/job_200906101234_0001")
+      != 0) {
     ret = -1;
   }
   free(job_dir);
@@ -123,30 +136,34 @@
 }
 
 void test_get_attempt_directory() {
-  char *attempt_dir = (char *) get_attempt_directory(
-      "/tmp/taskTracker/jobcache/job_200906101234_0001",
-      "attempt_200906112028_0001_m_000000_0");
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  printf("job_dir obtained is %s\n", job_dir);
+  char *attempt_dir = (char *) get_attempt_directory(job_dir,
+      "attempt_200906101234_0001_m_000000_0");
   printf("attempt_dir obtained is %s\n", attempt_dir);
   int ret = 0;
   if (strcmp(
       attempt_dir,
-      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0")
+      "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906101234_0001_m_000000_0")
       != 0) {
     ret = -1;
   }
+  free(job_dir);
   free(attempt_dir);
   assert(ret == 0);
 }
 
 void test_get_task_launcher_file() {
-  char *task_file = (char *) get_task_launcher_file(
-      "/tmp/taskTracker/jobcache/job_200906101234_0001",
+  char *job_dir = (char *) get_job_directory("/tmp", "user",
+      "job_200906101234_0001");
+  char *task_file = (char *) get_task_launcher_file(job_dir,
       "attempt_200906112028_0001_m_000000_0");
   printf("task_file obtained is %s\n", task_file);
   int ret = 0;
   if (strcmp(
       task_file,
-      "/tmp/taskTracker/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
+      "/tmp/taskTracker/user/jobcache/job_200906101234_0001/attempt_200906112028_0001_m_000000_0/taskjvm.sh")
       != 0) {
     ret = -1;
   }
@@ -168,13 +185,27 @@
 }
 
 int main(int argc, char **argv) {
-  printf("Starting tests\n");
+  printf("\nStarting tests\n");
   LOGFILE = stdout;
+
+  printf("\nTesting check_variable_against_config()\n");
   test_check_variable_against_config();
+
+  printf("\nTesting get_user_directory()\n");
+  test_get_user_directory();
+
+  printf("\nTesting get_job_directory()\n");
   test_get_job_directory();
+
+  printf("\nTesting get_attempt_directory()\n");
   test_get_attempt_directory();
+
+  printf("\nTesting get_task_launcher_file()\n");
   test_get_task_launcher_file();
+
+  printf("\nTesting get_task_log_dir()\n");
   test_get_task_log_dir();
-  printf("Finished tests\n");
+
+  printf("\nFinished tests\n");
   return 0;
 }

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib:713112
 /hadoop/core/trunk/src/contrib:784664-785643
-/hadoop/mapreduce/trunk/src/contrib:804974-807678
+/hadoop/mapreduce/trunk/src/contrib:804974-884916

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/block_forensics/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/hdfs/src/contrib/block_forensics:713112
+/hadoop/mapreduce/trunk/src/contrib/block_forensics:807679-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml Sat Nov 28 20:26:01 2009
@@ -32,6 +32,7 @@
   <property name="hadoop.root" location="${root}/../../../"/>
   <property name="src.dir"  location="${root}/src/java"/>
   <property name="src.test" location="${root}/src/test"/>
+  <property name="src.test.data" location="${root}/src/test/data"/>
   <property name="src.examples" location="${root}/src/examples"/>
 
   <available file="${src.examples}" type="dir" property="examples.available"/>
@@ -45,6 +46,7 @@
   <property name="build.dir" location="${hadoop.root}/build/contrib/${name}"/>
   <property name="build.classes" location="${build.dir}/classes"/>
   <property name="build.test" location="${build.dir}/test"/>
+  <property name="test.build.extraconf" value="${build.test}/extraconf"/>
   <property name="build.examples" location="${build.dir}/examples"/>
   <property name="hadoop.log.dir" location="${build.dir}/test/logs"/>
   <!-- all jars together -->
@@ -56,6 +58,7 @@
             value="http://java.sun.com/j2se/1.4/docs/api/"/>
 
   <property name="build.encoding" value="ISO-8859-1"/>
+  <property name="dest.jar" value="hadoop-${version}-${name}.jar"/>
 
   <fileset id="lib.jars" dir="${root}" includes="lib/*.jar"/>
 
@@ -68,8 +71,7 @@
   <property name="ivy.jar" location="${hadoop.root}/ivy/ivy-${ivy.version}.jar"/>
   <property name="ivy_repo_url" 
 	value="http://repo2.maven.org/maven2/org/apache/ivy/ivy/${ivy.version}/ivy-${ivy.version}.jar" />
-  <property name="build.dir" location="build" />
-  <property name="build.ivy.dir" location="${build.dir}/ivy" />
+  <property name="build.ivy.dir" location="${hadoop.root}/build/ivy" />
   <property name="build.ivy.lib.dir" location="${build.ivy.dir}/lib" />
   <property name="build.ivy.report.dir" location="${build.ivy.dir}/report" />
   <property name="common.ivy.lib.dir" location="${build.ivy.lib.dir}/${ant.project.name}/common"/> 
@@ -83,6 +85,7 @@
     <pathelement location="${build.classes}"/>
     <fileset refid="lib.jars"/>
     <pathelement location="${hadoop.root}/build/classes"/>
+    <pathelement location="${hadoop.root}/build/tools"/>
     <fileset dir="${hadoop.root}/lib">
       <include name="**/*.jar" />
     </fileset>
@@ -93,6 +96,7 @@
   <!-- the unit test classpath -->
   <path id="test.classpath">
     <pathelement location="${build.test}" />
+    <pathelement location="${test.build.extraconf}" />
     <pathelement location="${hadoop.root}/build/test/classes"/>
     <pathelement location="${hadoop.root}/build/test/core/classes"/>
     <pathelement location="${hadoop.root}/build/test/hdfs/classes"/>
@@ -118,6 +122,7 @@
     <mkdir dir="${build.dir}"/>
     <mkdir dir="${build.classes}"/>
     <mkdir dir="${build.test}"/>
+    <mkdir dir="${build.test}/extraconf"/>
     <mkdir dir="${build.examples}"/>
     <mkdir dir="${hadoop.log.dir}"/>
     <antcall target="init-contrib"/>
@@ -144,7 +149,7 @@
   <!-- ======================================================= -->
   <!-- Compile a Hadoop contrib's example files (if available) -->
   <!-- ======================================================= -->
-  <target name="compile-examples" depends="compile" if="examples.available">
+  <target name="compile-examples" depends="compile, ivy-retrieve-common" if="examples.available">
     <echo message="contrib: ${name}"/>
     <javac
      encoding="${build.encoding}"
@@ -179,7 +184,7 @@
   <target name="jar" depends="compile" unless="skip.contrib">
     <echo message="contrib: ${name}"/>
     <jar
-      jarfile="${build.dir}/hadoop-${version}-${name}.jar"
+      jarfile="${build.dir}/${dest.jar}"
       basedir="${build.classes}"      
     />
   </target>
@@ -206,7 +211,7 @@
     <mkdir dir="${dist.dir}/contrib/${name}"/>
     <copy todir="${dist.dir}/contrib/${name}" includeEmptyDirs="false" flatten="true">
       <fileset dir="${build.dir}">
-        <include name="hadoop-${version}-${name}.jar" />
+        <include name="${dest.jar}" />
       </fileset>
     </copy>
   </target>
@@ -220,12 +225,14 @@
     <mkdir dir="${hadoop.log.dir}"/>
     <junit
       printsummary="yes" showoutput="${test.output}" 
-      haltonfailure="no" fork="yes" maxmemory="256m"
+      haltonfailure="no" fork="yes" maxmemory="512m"
       errorProperty="tests.failed" failureProperty="tests.failed"
       timeout="${test.timeout}">
       
       <sysproperty key="test.build.data" value="${build.test}/data"/>
       <sysproperty key="build.test" value="${build.test}"/>
+      <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
+      <sysproperty key="src.test.data" value="${src.test.data}"/>
       <sysproperty key="contrib.name" value="${name}"/>
       
       <!-- requires fork=yes for: 

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build-contrib.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build-contrib.xml:713112
 /hadoop/core/trunk/src/contrib/build-contrib.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/build-contrib.xml:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml Sat Nov 28 20:26:01 2009
@@ -56,6 +56,9 @@
       <fileset dir="." includes="sqoop/build.xml"/>
       <fileset dir="." includes="mrunit/build.xml"/> 
       <fileset dir="." includes="dynamic-scheduler/build.xml"/>
+      <fileset dir="." includes="gridmix/build.xml"/>
+      <fileset dir="." includes="vertica/build.xml"/>
+      <fileset dir="." includes="mumak/build.xml"/>
     </subant>
     <available file="${build.contrib.dir}/testsfailed" property="testsfailed"/>
     <fail if="testsfailed">Tests failed!</fail>

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/build.xml:713112
 /hadoop/core/trunk/src/contrib/build.xml:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/build.xml:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/build.xml:804974-884916

Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/capacity-scheduler:713112
 /hadoop/core/trunk/src/contrib/capacity-scheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler:804974-884916

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,49 +24,35 @@
     <artifact conf="master"/>
   </publications>
   <dependencies>
-    <dependency org="commons-cli"
-      name="commons-cli"
-      rev="${commons-cli.version}"
-      conf="common->default"/>
-    <dependency org="commons-logging"
-      name="commons-logging"
-      rev="${commons-logging.version}"
-      conf="common->default"/>
-   <dependency org="junit"
-      name="junit"
-      rev="${junit.version}"
-      conf="common->default"/>
-    <dependency org="log4j"
-      name="log4j"
-      rev="${log4j.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jetty-util"
-      rev="${jetty-util.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jetty"
-      rev="${jetty.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jsp-api-2.1"
-      rev="${jetty.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="jsp-2.1"
-      rev="${jetty.version}"
-      conf="common->master"/>
-    <dependency org="org.mortbay.jetty"
-      name="servlet-api-2.5"
-      rev="${servlet-api-2.5.version}"
-      conf="common->master"/> 
-    <dependency org="commons-httpclient"
-      name="commons-httpclient"
-      rev="${commons-httpclient.version}"
-      conf="common->master"/> 
-    <dependency org="org.apache.hadoop"
-      name="avro"
-      rev="1.0.0"
-      conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-core" 
+                rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-core-test" 
+                rev="${hadoop-core.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs" 
+                rev="${hadoop-hdfs.version}" conf="common->default"/>
+    <dependency org="org.apache.hadoop" name="hadoop-hdfs-test" 
+                rev="${hadoop-hdfs.version}" conf="common->default"/>
+    <dependency org="commons-cli" name="commons-cli" 
+                rev="${commons-cli.version}" conf="common->default"/>
+    <dependency org="commons-logging" name="commons-logging" 
+                rev="${commons-logging.version}" conf="common->default"/>
+    <dependency org="junit" name="junit" 
+                rev="${junit.version}" conf="common->default"/>
+    <dependency org="log4j" name="log4j" 
+                rev="${log4j.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jetty-util" 
+                rev="${jetty-util.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jetty"
+                rev="${jetty.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jsp-api-2.1" 
+                rev="${jetty.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="jsp-2.1" 
+	        rev="${jetty.version}" conf="common->master"/>
+    <dependency org="org.mortbay.jetty" name="servlet-api-2.5" 
+                rev="${servlet-api-2.5.version}" conf="common->master"/>
+    <dependency org="commons-httpclient" name="commons-httpclient" 
+                rev="${commons-httpclient.version}" conf="common->master"/>
+    <dependency org="org.apache.hadoop" name="avro" 
+                rev="${avro.version}" conf="common->default"/>
   </dependencies>
 </ivy-module>

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Sat Nov 28 20:26:01 2009
@@ -17,21 +17,44 @@
 
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 
+import java.util.Properties;
+import java.util.Map;
+import java.util.HashMap;
+
 /**
- * Class providing access to resource manager configuration.
+ * Class providing access to Capacity scheduler configuration and default values
+ * for queue-configuration. Capacity scheduler configuration includes settings
+ * for the {@link JobInitializationPoller} and default values for queue
+ * configuration. These are read from the file
+ * {@link CapacitySchedulerConf#SCHEDULER_CONF_FILE} on the CLASSPATH. The main
+ * queue configuration is defined in the file
+ * {@link QueueManager#QUEUE_CONF_FILE_NAME} on the CLASSPATH.
+ * 
+ * <p>
  * 
- * Resource manager configuration involves setting up queues, and defining
- * various properties for the queues. These are typically read from a file 
- * called capacity-scheduler.xml that must be in the classpath of the
- * application. The class provides APIs to get/set and reload the 
- * configuration for the queues.
+ * This class also provides APIs to get and set the configuration for the
+ * queues.
  */
 class CapacitySchedulerConf {
-  
-  /** Default file name from which the resource manager configuration is read. */ 
+
+  static final Log LOG = LogFactory.getLog(CapacitySchedulerConf.class);
+
+  static final String CAPACITY_PROPERTY = "capacity";
+
+  static final String SUPPORTS_PRIORITY_PROPERTY = "supports-priority";
+
+  static final String MAXIMUM_INITIALIZED_JOBS_PER_USER_PROPERTY =
+      "maximum-initialized-jobs-per-user";
+
+  static final String MINIMUM_USER_LIMIT_PERCENT_PROPERTY =
+      "minimum-user-limit-percent";
+
+  /** Default file name from which the capacity scheduler configuration is read. */
   public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
   
   private int defaultUlimitMinimum;
@@ -41,6 +64,9 @@
   private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = 
     "mapred.capacity-scheduler.queue.";
 
+  private Map<String, Properties> queueProperties
+    = new HashMap<String,Properties>();
+
   /**
    * If {@link JobConf#MAPRED_TASK_MAXPMEM_PROPERTY} is set to
    * {@link JobConf#DISABLED_MEMORY_LIMIT}, this configuration will be used to
@@ -74,18 +100,12 @@
   @Deprecated
   static final String UPPER_LIMIT_ON_TASK_PMEM_PROPERTY =
     "mapred.capacity-scheduler.task.limit.maxpmem";
-
-  /**
-   *  Configuration that provides the maximum cap for the map task in a queue
-   *  at any given point of time.
-   */
-  static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
-
+  
   /**
-   *  Configuration that provides the maximum cap for the reduce task in a queue
-   *  at any given point of time.
+   * A maximum capacity defines a limit beyond which a sub-queue
+   * cannot use the capacity of its parent queue.
    */
-  static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
+  static final String MAX_CAPACITY_PROPERTY ="maximum-capacity";
 
   /**
    * The constant which defines the default initialization thread
@@ -102,29 +122,29 @@
   private Configuration rmConf;
 
   private int defaultMaxJobsPerUsersToInitialize;
-  
+
   /**
-   * Create a new ResourceManagerConf.
+   * Create a new CapacitySchedulerConf.
    * This method reads from the default configuration file mentioned in
-   * {@link RM_CONF_FILE}, that must be present in the classpath of the
+   * {@link SCHEDULER_CONF_FILE}, that must be present in the classpath of the
    * application.
    */
   public CapacitySchedulerConf() {
     rmConf = new Configuration(false);
-    rmConf.addResource(SCHEDULER_CONF_FILE);
+    getCSConf().addResource(SCHEDULER_CONF_FILE);
     initializeDefaults();
   }
 
   /**
-   * Create a new ResourceManagerConf reading the specified configuration
+   * Create a new CapacitySchedulerConf reading the specified configuration
    * file.
    * 
    * @param configFile {@link Path} to the configuration file containing
-   * the resource manager configuration.
+   * the Capacity scheduler configuration.
    */
   public CapacitySchedulerConf(Path configFile) {
     rmConf = new Configuration(false);
-    rmConf.addResource(configFile);
+    getCSConf().addResource(configFile);
     initializeDefaults();
   }
   
@@ -133,14 +153,18 @@
    * which is used by the Capacity Scheduler.
    */
   private void initializeDefaults() {
-    defaultUlimitMinimum = rmConf.getInt(
+    defaultUlimitMinimum = getCSConf().getInt(
         "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
-    defaultSupportPriority = rmConf.getBoolean(
+    defaultSupportPriority = getCSConf().getBoolean(
         "mapred.capacity-scheduler.default-supports-priority", false);
-    defaultMaxJobsPerUsersToInitialize = rmConf.getInt(
+    defaultMaxJobsPerUsersToInitialize = getCSConf().getInt(
         "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user",
         2);
   }
+
+  void setProperties(String queueName , Properties properties) {
+    this.queueProperties.put(queueName,properties);
+  }
   
   /**
    * Get the percentage of the cluster for the specified queue.
@@ -163,31 +187,60 @@
     //In case of both capacity and default capacity not configured.
     //Last check is if the configuration is specified and is marked as
     //negative we throw exception
-    String raw = rmConf.getRaw(toFullPropertyName(queue, 
-        "capacity"));
-    if(raw == null) {
-      return -1;
-    }
-    float result = rmConf.getFloat(toFullPropertyName(queue, 
-                                   "capacity"), 
-                                   -1);
-    if (result < 0.0 || result > 100.0) {
+    String raw = getProperty(queue, CAPACITY_PROPERTY);
+
+    float result = this.getFloat(raw,-1);
+    
+    if (result > 100.0) {
       throw new IllegalArgumentException("Illegal capacity for queue " + queue +
                                          " of " + result);
     }
     return result;
   }
-  
+
+  String getProperty(String queue,String property) {
+    if(!queueProperties.containsKey(queue))
+     throw new IllegalArgumentException("Invalid queuename " + queue);
+
+    //This check is still required as sometimes we create queue with null
+    //This is typically happens in case of test.
+    if(queueProperties.get(queue) != null) {
+      return queueProperties.get(queue).getProperty(property);
+    }
+
+    return null;
+  }
+
   /**
-   * Sets the capacity of the given queue.
+   * Return the maximum percentage of the cluster capacity that can be
+   * used by the given queue
+   * This percentage defines a limit beyond which a
+   * sub-queue cannot use the capacity of its parent queue.
+   * This provides a means to limit how much excess capacity a
+   * sub-queue can use. By default, there is no limit.
+   *
+   * The maximum-capacity-stretch of a queue can only be
+   * greater than or equal to its minimum capacity.
    * 
    * @param queue name of the queue
-   * @param capacity percent of the cluster for the queue.
+   * @return maximum capacity percent of cluster for the queue
    */
-  public void setCapacity(String queue,float capacity) {
-    rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity);
+  public float getMaxCapacity(String queue) {
+    String raw = getProperty(queue, MAX_CAPACITY_PROPERTY);
+    float result = getFloat(raw,-1);
+    result = (result <= 0) ? -1 : result; 
+    if (result > 100.0) {
+      throw new IllegalArgumentException("Illegal maximum-capacity-stretch " +
+        "for queue " + queue +" of " + result);
+    }
+
+    if((result != -1) && (result < getCapacity(queue))) {
+      throw new IllegalArgumentException("maximum-capacity-stretch " +
+        "for a queue should be greater than capacity ");
+    }
+    return result;
   }
-  
+
   /**
    * Get whether priority is supported for this queue.
    * 
@@ -198,21 +251,10 @@
    * @return Whether this queue supports priority or not.
    */
   public boolean isPrioritySupported(String queue) {
-    return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"),
-        defaultSupportPriority);  
-  }
-  
-  /**
-   * Set whether priority is supported for this queue.
-   * 
-   * 
-   * @param queue name of the queue
-   * @param value true, if the queue must support priorities, false otherwise.
-   */
-  public void setPrioritySupported(String queue, boolean value) {
-    rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value);
+    String raw = getProperty(queue, SUPPORTS_PRIORITY_PROPERTY);
+    return Boolean.parseBoolean(raw);
   }
-  
+
   /**
    * Get the minimum limit of resources for any user submitting jobs in 
    * this queue, in percentage.
@@ -229,8 +271,8 @@
    * 
    */
   public int getMinimumUserLimitPercent(String queue) {
-    int userLimit = rmConf.getInt(toFullPropertyName(queue,
-        "minimum-user-limit-percent"), defaultUlimitMinimum);
+    String raw = getProperty(queue, MINIMUM_USER_LIMIT_PERCENT_PROPERTY);
+    int userLimit = getInt(raw,defaultUlimitMinimum);
     if(userLimit <= 0 || userLimit > 100) {
       throw new IllegalArgumentException("Invalid user limit : "
           + userLimit + " for queue : " + queue);
@@ -238,28 +280,6 @@
     return userLimit;
   }
   
-  /**
-   * Set the minimum limit of resources for any user submitting jobs in
-   * this queue, in percentage.
-   * 
-   * @param queue name of the queue
-   * @param value minimum limit of resources for any user submitting jobs
-   * in this queue
-   */
-  public void setMinimumUserLimitPercent(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), 
-                    value);
-  }
-  
-  /**
-   * Reload configuration by clearing the information read from the 
-   * underlying configuration file.
-   */
-  public synchronized void reloadConfiguration() {
-    rmConf.reloadConfiguration();
-    initializeDefaults();
-  }
-  
   static final String toFullPropertyName(String queue, 
                                                   String property) {
       return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property;
@@ -275,27 +295,16 @@
    * or zero.
    */
   public int getMaxJobsPerUserToInitialize(String queue) {
-    int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue,
-        "maximum-initialized-jobs-per-user"), 
-        defaultMaxJobsPerUsersToInitialize);
+    String raw =
+        getProperty(queue, MAXIMUM_INITIALIZED_JOBS_PER_USER_PROPERTY);
+    int maxJobsPerUser = getInt(raw,defaultMaxJobsPerUsersToInitialize);
     if(maxJobsPerUser <= 0) {
       throw new IllegalArgumentException(
           "Invalid maximum jobs per user configuration " + maxJobsPerUser);
     }
     return maxJobsPerUser;
   }
-  
-  /**
-   * Sets the maximum number of jobs which are allowed to be initialized 
-   * for a user in the queue.
-   * 
-   * @param queue queue name.
-   * @param value maximum number of jobs allowed to be initialized per user.
-   */
-  public void setMaxJobsPerUserToInitialize(String queue, int value) {
-    rmConf.setInt(toFullPropertyName(queue, 
-        "maximum-initialized-jobs-per-user"), value);
-  }
+
 
   /**
    * Amount of time in milliseconds which poller thread and initialization
@@ -308,7 +317,7 @@
    * @throws IllegalArgumentException if time is negative or zero.
    */
   public long getSleepInterval() {
-    long sleepInterval = rmConf.getLong(
+    long sleepInterval = getCSConf().getLong(
         "mapred.capacity-scheduler.init-poll-interval", 
         INITIALIZATION_THREAD_POLLING_INTERVAL);
     
@@ -338,7 +347,7 @@
    * in parallel.
    */
   public int getMaxWorkerThreads() {
-    int maxWorkerThreads = rmConf.getInt(
+    int maxWorkerThreads = getCSConf().getInt(
         "mapred.capacity-scheduler.init-worker-threads", 
         MAX_INITIALIZATION_WORKER_THREADS);
     if(maxWorkerThreads <= 0) {
@@ -347,62 +356,28 @@
     }
     return maxWorkerThreads;
   }
-  /**
-   * Set the sleep interval which initialization poller would sleep before 
-   * it looks at the jobs in the job queue.
-   * 
-   * @param interval sleep interval
-   */
-  public void setSleepInterval(long interval) {
-    rmConf.setLong(
-        "mapred.capacity-scheduler.init-poll-interval", interval);
-  }
-  
-  /**
-   * Sets number of threads which can be spawned to initialize jobs in
-   * parallel.
-   * 
-   * @param poolSize number of threads to be spawned to initialize jobs
-   * in parallel.
-   */
-  public void setMaxWorkerThreads(int poolSize) {
-    rmConf.setInt(
-        "mapred.capacity-scheduler.init-worker-threads", poolSize);
-  }
 
-  /**
-   * get the max map slots cap
-   * @param queue
-   * @return
-   */
-  public int getMaxMapCap(String queue) {
-    return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
+  public Configuration getCSConf() {
+    return rmConf;
   }
 
-  /**
-   * Used for testing
-   * @param queue
-   * @param val
-   */
-  public void setMaxMapCap(String queue,int val) {
-    rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
-  }
-
-  /**
-   * get the max reduce slots cap
-   * @param queue
-   * @return
-   */
-  public int getMaxReduceCap(String queue) {
-    return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);    
+  float getFloat(String valueString,float defaultValue) {
+    if (valueString == null)
+      return defaultValue;
+    try {
+      return Float.parseFloat(valueString);
+    } catch (NumberFormatException e) {
+      return defaultValue;
+    }
   }
 
-  /**
-   * Used for testing
-   * @param queue
-   * @param val
-   */
-  public void setMaxReduceCap(String queue,int val) {
-    rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
+  int getInt(String valueString,int defaultValue) {
+    if (valueString == null)
+      return defaultValue;
+    try {
+      return Integer.parseInt(valueString);
+    } catch (NumberFormatException e) {
+      return defaultValue;
+    }
   }
 }