You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/04/16 23:44:46 UTC
svn commit: r529410 [2/27] - in /lucene/hadoop/trunk: ./
src/contrib/abacus/src/examples/org/apache/hadoop/abacus/examples/
src/contrib/abacus/src/java/org/apache/hadoop/abacus/
src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/c...
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Mon Apr 16 14:44:35 2007
@@ -87,9 +87,9 @@
// need these two at class level to extract values later from
// commons-cli command line
private MultiPropertyOption jobconf = new MultiPropertyOption(
- "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D');
+ "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D');
private MultiPropertyOption cmdenv = new MultiPropertyOption(
- "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E');
+ "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E');
public StreamJob(String[] argv, boolean mayExit) {
setupOptions();
@@ -199,7 +199,7 @@
void parseArgv(){
CommandLine cmdLine = null ;
try{
- cmdLine = parser.parse(argv_);
+ cmdLine = parser.parse(argv_);
}catch(Exception oe){
LOG.error(oe.getMessage());
if (detailedUsage_) {
@@ -288,36 +288,36 @@
}
private Option createOption(String name, String desc,
- String argName, int max, boolean required){
+ String argName, int max, boolean required){
Argument argument = argBuilder.
- withName(argName).
- withMinimum(1).
- withMaximum(max).
- create();
+ withName(argName).
+ withMinimum(1).
+ withMaximum(max).
+ create();
return builder.
- withLongName(name).
- withArgument(argument).
- withDescription(desc).
- withRequired(required).
- create();
+ withLongName(name).
+ withArgument(argument).
+ withDescription(desc).
+ withRequired(required).
+ create();
}
private Option createOption(String name, String desc,
- String argName, int max, boolean required, Validator validator){
+ String argName, int max, boolean required, Validator validator){
Argument argument = argBuilder.
- withName(argName).
- withMinimum(1).
- withMaximum(max).
- withValidator(validator).
- create() ;
+ withName(argName).
+ withMinimum(1).
+ withMaximum(max).
+ withValidator(validator).
+ create() ;
return builder.
- withLongName(name).
- withArgument(argument).
- withDescription(desc).
- withRequired(required).
- create();
+ withLongName(name).
+ withArgument(argument).
+ withDescription(desc).
+ withRequired(required).
+ create();
}
private Option createBoolOption(String name, String desc){
@@ -327,82 +327,82 @@
private void setupOptions(){
final Validator fileValidator = new Validator(){
- public void validate(final List values) throws InvalidArgumentException {
- // Note : This code doesnt belong here, it should be changed to
- // an can exec check in java 6
- for (String file : (List<String>)values) {
- File f = new File(file);
- if ( ! f.exists() ) {
- throw new InvalidArgumentException("Argument : " +
- f.getAbsolutePath() + " doesn't exist.");
- }
- if ( ! f.isFile() ) {
- throw new InvalidArgumentException("Argument : " +
- f.getAbsolutePath() + " is not a file.");
- }
- if ( ! f.canRead() ) {
- throw new InvalidArgumentException("Argument : " +
- f.getAbsolutePath() + " is not accessible");
+ public void validate(final List values) throws InvalidArgumentException {
+ // Note : This code doesnt belong here, it should be changed to
+ // an can exec check in java 6
+ for (String file : (List<String>)values) {
+ File f = new File(file);
+ if ( ! f.exists() ) {
+ throw new InvalidArgumentException("Argument : " +
+ f.getAbsolutePath() + " doesn't exist.");
+ }
+ if ( ! f.isFile() ) {
+ throw new InvalidArgumentException("Argument : " +
+ f.getAbsolutePath() + " is not a file.");
+ }
+ if ( ! f.canRead() ) {
+ throw new InvalidArgumentException("Argument : " +
+ f.getAbsolutePath() + " is not accessible");
+ }
}
- }
- }
- };
+ }
+ };
// Note: not extending CLI2's FileValidator, that overwrites
// the String arg into File and causes ClassCastException
// in inheritance tree.
final Validator execValidator = new Validator(){
- public void validate(final List values) throws InvalidArgumentException {
- // Note : This code doesnt belong here, it should be changed to
- // an can exec check in java 6
- for (String file : (List<String>)values) {
- try{
- Runtime.getRuntime().exec("chmod 0777 " + (new File(file)).getAbsolutePath());
- }catch(IOException ioe){
- // ignore
+ public void validate(final List values) throws InvalidArgumentException {
+ // Note : This code doesnt belong here, it should be changed to
+ // an can exec check in java 6
+ for (String file : (List<String>)values) {
+ try{
+ Runtime.getRuntime().exec("chmod 0777 " + (new File(file)).getAbsolutePath());
+ }catch(IOException ioe){
+ // ignore
+ }
}
- }
- fileValidator.validate(values);
- }
- };
+ fileValidator.validate(values);
+ }
+ };
Option input = createOption("input",
- "DFS input file(s) for the Map step",
- "path",
- Integer.MAX_VALUE,
- true);
+ "DFS input file(s) for the Map step",
+ "path",
+ Integer.MAX_VALUE,
+ true);
Option output = createOption("output",
- "DFS output directory for the Reduce step",
- "path", 1, true);
+ "DFS output directory for the Reduce step",
+ "path", 1, true);
Option mapper = createOption("mapper",
- "The streaming command to run", "cmd", 1, false);
+ "The streaming command to run", "cmd", 1, false);
Option combiner = createOption("combiner",
- "The streaming command to run", "cmd",1, false);
+ "The streaming command to run", "cmd",1, false);
// reducer could be NONE
Option reducer = createOption("reducer",
- "The streaming command to run", "cmd", 1, false);
+ "The streaming command to run", "cmd", 1, false);
Option file = createOption("file",
- "File/dir to be shipped in the Job jar file",
- "file", Integer.MAX_VALUE, false, execValidator);
+ "File/dir to be shipped in the Job jar file",
+ "file", Integer.MAX_VALUE, false, execValidator);
Option dfs = createOption("dfs",
- "Optional. Override DFS configuration", "<h:p>|local", 1, false);
+ "Optional. Override DFS configuration", "<h:p>|local", 1, false);
Option jt = createOption("jt",
- "Optional. Override JobTracker configuration", "<h:p>|local",1, false);
+ "Optional. Override JobTracker configuration", "<h:p>|local",1, false);
Option additionalconfspec = createOption("additionalconfspec",
- "Optional.", "spec",1, false );
+ "Optional.", "spec",1, false );
Option inputformat = createOption("inputformat",
- "Optional.", "spec",1, false );
+ "Optional.", "spec",1, false );
Option outputformat = createOption("outputformat",
- "Optional.", "spec",1, false );
+ "Optional.", "spec",1, false );
Option partitioner = createOption("partitioner",
- "Optional.", "spec",1, false );
+ "Optional.", "spec",1, false );
Option inputreader = createOption("inputreader",
- "Optional.", "spec",1, false );
+ "Optional.", "spec",1, false );
Option cacheFile = createOption("cacheFile",
- "File name URI", "fileNameURI", 1, false);
+ "File name URI", "fileNameURI", 1, false);
Option cacheArchive = createOption("cacheArchive",
- "File name URI", "fileNameURI",1, false);
+ "File name URI", "fileNameURI",1, false);
// boolean properties
@@ -413,29 +413,29 @@
Option inputtagged = createBoolOption("inputtagged", "inputtagged");
allOptions = new GroupBuilder().
- withOption(input).
- withOption(output).
- withOption(mapper).
- withOption(combiner).
- withOption(reducer).
- withOption(file).
- withOption(dfs).
- withOption(jt).
- withOption(additionalconfspec).
- withOption(inputformat).
- withOption(outputformat).
- withOption(partitioner).
- withOption(inputreader).
- withOption(jobconf).
- withOption(cmdenv).
- withOption(cacheFile).
- withOption(cacheArchive).
- withOption(verbose).
- withOption(info).
- withOption(debug).
- withOption(inputtagged).
- withOption(help).
- create();
+ withOption(input).
+ withOption(output).
+ withOption(mapper).
+ withOption(combiner).
+ withOption(reducer).
+ withOption(file).
+ withOption(dfs).
+ withOption(jt).
+ withOption(additionalconfspec).
+ withOption(inputformat).
+ withOption(outputformat).
+ withOption(partitioner).
+ withOption(inputreader).
+ withOption(jobconf).
+ withOption(cmdenv).
+ withOption(cacheFile).
+ withOption(cacheArchive).
+ withOption(verbose).
+ withOption(info).
+ withOption(debug).
+ withOption(inputtagged).
+ withOption(help).
+ create();
parser.setGroup(allOptions);
}
@@ -478,7 +478,7 @@
System.out.println(" the key part ends at first TAB, the rest of the line is the value");
System.out.println("Custom Map input format: -inputreader package.MyRecordReader,n=v,n=v ");
System.out
- .println(" comma-separated name-values can be specified to configure the InputFormat");
+ .println(" comma-separated name-values can be specified to configure the InputFormat");
System.out.println(" Ex: -inputreader 'StreamXmlRecordReader,begin=<doc>,end=</doc>'");
System.out.println("Map output format, reduce input/output format:");
System.out.println(" Format defined by what the mapper command outputs. Line-oriented");
@@ -495,9 +495,9 @@
System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
System.out.println(" Use -reducer " + REDUCE_NONE);
System.out
- .println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
+ .println(" A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
System.out
- .println(" This speeds up processing, This also feels more like \"in-place\" processing");
+ .println(" This speeds up processing, This also feels more like \"in-place\" processing");
System.out.println(" because the input filename and the map input order are preserved");
System.out.println("To specify a single side-effect output file");
System.out.println(" -mapsideoutput [file:/C:/win|file:/unix/|socket://host:port]");//-output for side-effects will be soon deprecated
@@ -513,7 +513,7 @@
System.out.println(" -jobconf mapred.job.name='My Job' ");
System.out.println("To specify that line-oriented input is in gzip format:");
System.out
- .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
+ .println("(at this time ALL input files must be gzipped and this is not recognized based on file extension)");
System.out.println(" -jobconf stream.recordreader.compression=gzip ");
System.out.println("To change the local temp directory:");
System.out.println(" -jobconf dfs.data.dir=/tmp/dfs");
@@ -525,7 +525,7 @@
System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");
System.out.println(" $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\\");
System.out
- .println(" [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
+ .println(" [...] -jobconf stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
System.out.println("For more details about jobconf parameters see:");
System.out.println(" http://wiki.apache.org/lucene-hadoop/JobConfFile");
System.out.println("To set an environement variable in a streaming command:");
@@ -533,7 +533,7 @@
System.out.println();
System.out.println("Shortcut:");
System.out
- .println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\"");
+ .println(" setenv HSTREAMING \"$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar\"");
System.out.println();
System.out.println("Example: $HSTREAMING -mapper \"/usr/local/bin/perl5 filter.pl\"");
System.out.println(" -file /local/filter.pl -input \"/logs/0604*/*\" [...]");
@@ -619,7 +619,7 @@
// tmpDir=null means OS default tmp dir
File jobJar = File.createTempFile("streamjob", ".jar", tmpDir);
System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar
- + " tmpDir=" + tmpDir);
+ + " tmpDir=" + tmpDir);
if (debug_ == 0) {
jobJar.deleteOnExit();
}
@@ -709,14 +709,14 @@
.compareToIgnoreCase("org.apache.hadoop.mapred.KeyValueTextInputFormat") == 0)) {
fmt = KeyValueTextInputFormat.class;
} else if ((inputFormatSpec_
- .compareToIgnoreCase("SequenceFileInputFormat") == 0)
- || (inputFormatSpec_
- .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
+ .compareToIgnoreCase("SequenceFileInputFormat") == 0)
+ || (inputFormatSpec_
+ .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileInputFormat") == 0)) {
fmt = SequenceFileInputFormat.class;
} else if ((inputFormatSpec_
- .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
- || (inputFormatSpec_
- .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
+ .compareToIgnoreCase("SequenceFileToLineInputFormat") == 0)
+ || (inputFormatSpec_
+ .compareToIgnoreCase("org.apache.hadoop.mapred.SequenceFileToLineInputFormat") == 0)) {
fmt = SequenceFileAsTextInputFormat.class;
} else {
c = StreamUtil.goodClassOrNull(inputFormatSpec_, defaultPackage);
@@ -955,7 +955,7 @@
String hp = getJobTrackerHostPort();
LOG.info("To kill this job, run:");
LOG.info(getHadoopClientHome() + "/bin/hadoop job -Dmapred.job.tracker=" + hp + " -kill "
- + jobId_);
+ + jobId_);
//LOG.info("Job file: " + running_.getJobFile() );
LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
}
@@ -991,7 +991,7 @@
running_ = jc_.getJob(jobId_);
String report = null;
report = " map " + Math.round(running_.mapProgress() * 100) + "% reduce "
- + Math.round(running_.reduceProgress() * 100) + "%";
+ + Math.round(running_.reduceProgress() * 100) + "%";
if (!report.equals(lastReport)) {
LOG.info(report);
@@ -1006,16 +1006,16 @@
LOG.info("Output: " + output_);
error = false;
} catch(FileNotFoundException fe){
- LOG.error("Error launching job , bad input path : " + fe.getMessage());
- }catch(InvalidJobConfException je){
- LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
- }catch(FileAlreadyExistsException fae){
- LOG.error("Error launching job , Output path already exists : "
- + fae.getMessage());
- }catch( IOException ioe){
- LOG.error("Error Launching job : " + ioe.getMessage());
- }
- finally {
+ LOG.error("Error launching job , bad input path : " + fe.getMessage());
+ }catch(InvalidJobConfException je){
+ LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
+ }catch(FileAlreadyExistsException fae){
+ LOG.error("Error launching job , Output path already exists : "
+ + fae.getMessage());
+ }catch( IOException ioe){
+ LOG.error("Error Launching job : " + ioe.getMessage());
+ }
+ finally {
if (error && (running_ != null)) {
LOG.info("killJob...");
running_.killJob();
@@ -1031,25 +1031,25 @@
}
MultiPropertyOption(final String optionString,
- final String description,
- final int id){
+ final String description,
+ final int id){
super(optionString, description, id) ;
this.optionString = optionString;
}
public boolean canProcess(final WriteableCommandLine commandLine,
- final String argument) {
- boolean ret = (argument != null) && argument.startsWith(optionString);
+ final String argument) {
+ boolean ret = (argument != null) && argument.startsWith(optionString);
- return ret;
+ return ret;
}
public void process(final WriteableCommandLine commandLine,
- final ListIterator arguments) throws OptionException {
+ final ListIterator arguments) throws OptionException {
final String arg = (String) arguments.next();
if (!canProcess(commandLine, arg)) {
- throw new OptionException(this,
- ResourceConstants.UNEXPECTED_TOKEN, arg);
+ throw new OptionException(this,
+ ResourceConstants.UNEXPECTED_TOKEN, arg);
}
ArrayList properties = new ArrayList();
@@ -1127,5 +1127,5 @@
protected RunningJob running_;
protected String jobId_;
protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
- "Please specify a different link name for all of your caching URIs";
+ "Please specify a different link name for all of your caching URIs";
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamLineRecordReader.java Mon Apr 16 14:44:35 2007
@@ -32,7 +32,7 @@
public class StreamLineRecordReader extends KeyValueLineRecordReader {
public StreamLineRecordReader(Configuration job, FileSplit split)
- throws IOException {
+ throws IOException {
super(job, split);
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamSequenceRecordReader.java Mon Apr 16 14:44:35 2007
@@ -32,7 +32,7 @@
public class StreamSequenceRecordReader extends SequenceFileRecordReader {
public StreamSequenceRecordReader(Configuration conf, FileSplit split)
- throws IOException {
+ throws IOException {
super(conf, split);
}
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamUtil.java Mon Apr 16 14:44:35 2007
@@ -90,7 +90,7 @@
int pos = codePath.lastIndexOf(relPath);
if (pos == -1) {
throw new IllegalArgumentException("invalid codePath: className=" + className
- + " codePath=" + codePath);
+ + " codePath=" + codePath);
}
codePath = codePath.substring(0, pos);
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java Mon Apr 16 14:44:35 2007
@@ -47,7 +47,7 @@
public class StreamXmlRecordReader extends StreamBaseRecordReader {
public StreamXmlRecordReader(FSDataInputStream in, FileSplit split, Reporter reporter,
- JobConf job, FileSystem fs) throws IOException {
+ JobConf job, FileSystem fs) throws IOException {
super(in, split, reporter, job, fs);
beginMark_ = checkJobGet(CONF_NS + "begin");
@@ -67,8 +67,8 @@
public void init() throws IOException {
LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
- + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
- + in_.getPos());
+ + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
+ + in_.getPos());
if (start_ > in_.getPos()) {
in_.seek(start_);
}
@@ -102,9 +102,9 @@
((Text) value).set("");
/*if(numNext < 5) {
- System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
- + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
- }*/
+ System.out.println("@@@ " + numNext + ". true next k=|" + key.toString().replaceAll("[\\r\\n]", " ")
+ + "|, len=" + buf.length() + " v=|" + value.toString().replaceAll("[\\r\\n]", " ") + "|");
+ }*/
return true;
}
@@ -130,7 +130,7 @@
}
private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
- DataOutputBuffer outBufOrNull) throws IOException {
+ DataOutputBuffer outBufOrNull) throws IOException {
try {
long inStart = in_.getPos();
byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
@@ -168,10 +168,10 @@
}
state = nextState(state, input, match.start());
/*System.out.println("@@@" +
- s + ". Match " + match.start() + " " + match.groupCount() +
- " state=" + state + " input=" + input +
- " firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) +
- " match=" + match.group(0) + " in=" + in_.getPos());*/
+ s + ". Match " + match.start() + " " + match.groupCount() +
+ " state=" + state + " input=" + input +
+ " firstMatchStart_=" + firstMatchStart_ + " startinstream=" + (inStart+firstMatchStart_) +
+ " match=" + match.group(0) + " in=" + in_.getPos());*/
if (state == RECORD_ACCEPT) {
break;
}
@@ -230,7 +230,7 @@
case RECORD_MAYBE:
return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
}
- break;
+ break;
case CDATA_IN:
return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java Mon Apr 16 14:44:35 2007
@@ -30,124 +30,124 @@
*/
public class UTF8ByteArrayUtils {
- /**
- * Find the first occured tab in a UTF-8 encoded string
- * @param utf a byte array containing a UTF-8 encoded string
- * @param start starting offset
- * @param length no. of bytes
- * @return position that first tab occures otherwise -1
- */
- public static int findTab(byte [] utf, int start, int length) {
- for(int i=start; i<(start+length); i++) {
- if(utf[i]==(byte)'\t') {
- return i;
- }
- }
- return -1;
+ /**
+ * Find the first occured tab in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param start starting offset
+ * @param length no. of bytes
+ * @return position that first tab occures otherwise -1
+ */
+ public static int findTab(byte [] utf, int start, int length) {
+ for(int i=start; i<(start+length); i++) {
+ if(utf[i]==(byte)'\t') {
+ return i;
+ }
}
+ return -1;
+ }
- /**
- * Find the first occured tab in a UTF-8 encoded string
- * @param utf a byte array containing a UTF-8 encoded string
- * @return position that first tab occures otherwise -1
- */
- public static int findTab(byte [] utf) {
- return findTab(utf, 0, utf.length);
- }
+ /**
+ * Find the first occured tab in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @return position that first tab occures otherwise -1
+ */
+ public static int findTab(byte [] utf) {
+ return findTab(utf, 0, utf.length);
+ }
- /**
- * split a UTF-8 byte array into key and value
- * assuming that the delimilator is at splitpos.
- * @param utf utf-8 encoded string
- * @param start starting offset
- * @param length no. of bytes
- * @param key contains key upon the method is returned
- * @param val contains value upon the method is returned
- * @param splitPos the split pos
- * @throws IOException
- */
- public static void splitKeyVal(byte[] utf, int start, int length,
- Text key, Text val, int splitPos) throws IOException {
- if(splitPos<start || splitPos >= (start+length))
- throw new IllegalArgumentException( "splitPos must be in the range " +
- "[" + start + ", " + (start+length) + "]: " + splitPos);
- int keyLen = (splitPos-start);
- byte [] keyBytes = new byte[keyLen];
- System.arraycopy(utf, start, keyBytes, 0, keyLen);
- int valLen = (start+length)-splitPos-1;
- byte [] valBytes = new byte[valLen];
- System.arraycopy(utf, splitPos+1, valBytes, 0, valLen);
- key.set(keyBytes);
- val.set(valBytes);
- }
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param start starting offset
+ * @param length no. of bytes
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, int start, int length,
+ Text key, Text val, int splitPos) throws IOException {
+ if(splitPos<start || splitPos >= (start+length))
+ throw new IllegalArgumentException( "splitPos must be in the range " +
+ "[" + start + ", " + (start+length) + "]: " + splitPos);
+ int keyLen = (splitPos-start);
+ byte [] keyBytes = new byte[keyLen];
+ System.arraycopy(utf, start, keyBytes, 0, keyLen);
+ int valLen = (start+length)-splitPos-1;
+ byte [] valBytes = new byte[valLen];
+ System.arraycopy(utf, splitPos+1, valBytes, 0, valLen);
+ key.set(keyBytes);
+ val.set(valBytes);
+ }
- /**
- * split a UTF-8 byte array into key and value
- * assuming that the delimilator is at splitpos.
- * @param utf utf-8 encoded string
- * @param key contains key upon the method is returned
- * @param val contains value upon the method is returned
- * @param splitPos the split pos
- * @throws IOException
- */
- public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos)
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos)
throws IOException {
- splitKeyVal(utf, 0, utf.length, key, val, splitPos);
- }
+ splitKeyVal(utf, 0, utf.length, key, val, splitPos);
+ }
- /**
- * Read a utf8 encoded line from a data input stream.
- * @param in data input stream
- * @return a byte array containing the line
- * @throws IOException
- */
- public static byte[] readLine(InputStream in) throws IOException {
- byte [] buf = new byte[128];
- byte [] lineBuffer = buf;
- int room = 128;
- int offset = 0;
- boolean isEOF = false;
- while (true) {
- int b = in.read();
- if (b == -1) {
- isEOF = true;
- break;
- }
+ /**
+ * Read a utf8 encoded line from a data input stream.
+ * @param in data input stream
+ * @return a byte array containing the line
+ * @throws IOException
+ */
+ public static byte[] readLine(InputStream in) throws IOException {
+ byte [] buf = new byte[128];
+ byte [] lineBuffer = buf;
+ int room = 128;
+ int offset = 0;
+ boolean isEOF = false;
+ while (true) {
+ int b = in.read();
+ if (b == -1) {
+ isEOF = true;
+ break;
+ }
- char c = (char)b;
- if (c == '\n')
- break;
+ char c = (char)b;
+ if (c == '\n')
+ break;
- if (c == '\r') {
- in.mark(1);
- int c2 = in.read();
- if(c2 == -1) {
- isEOF = true;
- break;
- }
- if (c2 != '\n') {
- // push it back
- in.reset();
- }
+ if (c == '\r') {
+ in.mark(1);
+ int c2 = in.read();
+ if(c2 == -1) {
+ isEOF = true;
break;
}
-
- if (--room < 0) {
- buf = new byte[offset + 128];
- room = buf.length - offset - 1;
- System.arraycopy(lineBuffer, 0, buf, 0, offset);
- lineBuffer = buf;
+ if (c2 != '\n') {
+ // push it back
+ in.reset();
}
- buf[offset++] = (byte) c;
+ break;
}
-
- if(isEOF && offset==0) {
- return null;
- } else {
- lineBuffer = new byte[offset];
- System.arraycopy(buf, 0, lineBuffer, 0, offset);
- return lineBuffer;
+
+ if (--room < 0) {
+ buf = new byte[offset + 128];
+ room = buf.length - offset - 1;
+ System.arraycopy(lineBuffer, 0, buf, 0, offset);
+ lineBuffer = buf;
}
+ buf[offset++] = (byte) c;
+ }
+
+ if(isEOF && offset==0) {
+ return null;
+ } else {
+ lineBuffer = new byte[offset];
+ System.arraycopy(buf, 0, lineBuffer, 0, offset);
+ return lineBuffer;
}
+ }
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestGzipInput.java Mon Apr 16 14:44:35 2007
@@ -36,7 +36,7 @@
protected void createInput() throws IOException
{
GZIPOutputStream out = new GZIPOutputStream(
- new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+ new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
out.write(input.getBytes("UTF-8"));
out.close();
}
@@ -44,12 +44,12 @@
protected String[] genArgs() {
return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper", map,
- "-combiner", combine,
- "-reducer", reduce,
- "-jobconf", "stream.recordreader.compression=gzip"
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-combiner", combine,
+ "-reducer", reduce,
+ "-jobconf", "stream.recordreader.compression=gzip"
};
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamedMerge.java Mon Apr 16 14:44:35 2007
@@ -83,12 +83,12 @@
// keys are compared as Strings and ties are broken by stream index
// For example (k1; stream 2) < (k1; stream 3)
String expect = i18n(
- unt(">1\tk1\tv1\n", tag) +
- unt(">2\tk1\tv2\n", tag) +
- unt(">3\tk1\tv3\n", tag) +
- unt(">2\tk2\tv4\n", tag) +
- unt(">1\tk3\tv5\n", tag)
- );
+ unt(">1\tk1\tv1\n", tag) +
+ unt(">2\tk1\tv2\n", tag) +
+ unt(">3\tk1\tv3\n", tag) +
+ unt(">2\tk2\tv4\n", tag) +
+ unt(">1\tk3\tv5\n", tag)
+ );
return expect;
}
@@ -128,18 +128,18 @@
void callStreaming(String argSideOutput, boolean inputTagged) throws IOException {
String[] testargs = new String[] {
- //"-jobconf", "stream.debug=1",
- "-verbose",
- "-jobconf", "stream.testmerge=1",
- "-input", "+/input/part-00 | /input/part-01 | /input/part-02",
- "-mapper", StreamUtil.localizeBin("/bin/cat"),
- "-reducer", "NONE",
- "-output", "/my.output",
- "-mapsideoutput", argSideOutput,
- "-dfs", conf_.get("fs.default.name"),
- "-jt", "local",
- "-jobconf", "stream.sideoutput.localfs=true",
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ //"-jobconf", "stream.debug=1",
+ "-verbose",
+ "-jobconf", "stream.testmerge=1",
+ "-input", "+/input/part-00 | /input/part-01 | /input/part-02",
+ "-mapper", StreamUtil.localizeBin("/bin/cat"),
+ "-reducer", "NONE",
+ "-output", "/my.output",
+ "-mapsideoutput", argSideOutput,
+ "-dfs", conf_.get("fs.default.name"),
+ "-jt", "local",
+ "-jobconf", "stream.sideoutput.localfs=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
};
ArrayList argList = new ArrayList();
argList.addAll(Arrays.asList(testargs));
@@ -156,23 +156,23 @@
SideEffectConsumer startSideEffectConsumer(StringBuffer outBuf) {
SideEffectConsumer t = new SideEffectConsumer(outBuf) {
- ServerSocket listen;
- Socket client;
- InputStream in;
+ ServerSocket listen;
+ Socket client;
+ InputStream in;
- InputStream connectInputStream() throws IOException {
- listen = new ServerSocket(SOC_PORT);
- client = listen.accept();
- in = client.getInputStream();
- return in;
- }
+ InputStream connectInputStream() throws IOException {
+ listen = new ServerSocket(SOC_PORT);
+ client = listen.accept();
+ in = client.getInputStream();
+ return in;
+ }
- void close() throws IOException
- {
- listen.close();
- System.out.println("@@@listen closed");
- }
- };
+ void close() throws IOException
+ {
+ listen.close();
+ System.out.println("@@@listen closed");
+ }
+ };
t.start();
return t;
}
@@ -264,7 +264,7 @@
sideOutput = "socket://localhost:" + SOC_PORT + "/";
} else {
String userOut = StreamUtil.getBoundAntProperty(
- "hadoop.test.localoutputfile", null);
+ "hadoop.test.localoutputfile", null);
if(userOut != null) {
f = new File(userOut);
// don't delete so they can mkfifo
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreaming.java Mon Apr 16 14:44:35 2007
@@ -55,30 +55,30 @@
protected void createInput() throws IOException
{
DataOutputStream out = new DataOutputStream(
- new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+ new FileOutputStream(INPUT_FILE.getAbsoluteFile()));
out.write(input.getBytes("UTF-8"));
out.close();
}
protected String[] genArgs() {
return new String[] {
- "-input", INPUT_FILE.getAbsolutePath(),
- "-output", OUTPUT_DIR.getAbsolutePath(),
- "-mapper", map,
- "-combiner", combine,
- "-reducer", reduce,
- //"-verbose",
- //"-jobconf", "stream.debug=set"
- "-jobconf", "keep.failed.task.files=true",
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
- };
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-combiner", combine,
+ "-reducer", reduce,
+ //"-verbose",
+ //"-jobconf", "stream.debug=set"
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp")
+ };
}
public void testCommandLine()
{
try {
try {
- OUTPUT_DIR.getAbsoluteFile().delete();
+ OUTPUT_DIR.getAbsoluteFile().delete();
} catch (Exception e) {
}
Modified: lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java (original)
+++ lucene/hadoop/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestSymLink.java Mon Apr 16 14:44:35 2007
@@ -66,16 +66,16 @@
String strJobtracker = "mapred.job.tracker=" + "localhost:" + mr.getJobTrackerPort();
String strNamenode = "fs.default.name=" + namenode;
String argv[] = new String[] {
- "-input", INPUT_FILE,
- "-output", OUTPUT_DIR,
- "-mapper", map,
- "-reducer", reduce,
- //"-verbose",
- //"-jobconf", "stream.debug=set"
- "-jobconf", strNamenode,
- "-jobconf", strJobtracker,
- "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
- "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
+ "-input", INPUT_FILE,
+ "-output", OUTPUT_DIR,
+ "-mapper", map,
+ "-reducer", reduce,
+ //"-verbose",
+ //"-jobconf", "stream.debug=set"
+ "-jobconf", strNamenode,
+ "-jobconf", strJobtracker,
+ "-jobconf", "stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+ "-cacheFile", "hdfs://"+fileSys.getName()+CACHE_FILE + "#testlink"
};
fileSys.delete(new Path(OUTPUT_DIR));
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/ExampleDriver.java Mon Apr 16 14:44:35 2007
@@ -26,22 +26,22 @@
*/
public class ExampleDriver {
- public static void main(String argv[]){
- ProgramDriver pgd = new ProgramDriver();
- try {
- pgd.addClass("wordcount", WordCount.class,
- "A map/reduce program that counts the words in the input files.");
- pgd.addClass("grep", Grep.class,
- "A map/reduce program that counts the matches of a regex in the input.");
- pgd.addClass("randomwriter", RandomWriter.class,
- "A map/reduce program that writes 10GB of random data per node.");
- pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer.");
- pgd.addClass("pi", PiEstimator.class, "A map/reduce program that estimates Pi using monte-carlo method.");
- pgd.driver(argv);
- }
- catch(Throwable e){
- e.printStackTrace();
- }
+ public static void main(String argv[]){
+ ProgramDriver pgd = new ProgramDriver();
+ try {
+ pgd.addClass("wordcount", WordCount.class,
+ "A map/reduce program that counts the words in the input files.");
+ pgd.addClass("grep", Grep.class,
+ "A map/reduce program that counts the matches of a regex in the input.");
+ pgd.addClass("randomwriter", RandomWriter.class,
+ "A map/reduce program that writes 10GB of random data per node.");
+ pgd.addClass("sort", Sort.class, "A map/reduce program that sorts the data written by the random writer.");
+ pgd.addClass("pi", PiEstimator.class, "A map/reduce program that estimates Pi using monte-carlo method.");
+ pgd.driver(argv);
}
+ catch(Throwable e){
+ e.printStackTrace();
+ }
+ }
}
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/PiEstimator.java Mon Apr 16 14:44:35 2007
@@ -63,25 +63,25 @@
* @param reporter
*/
public void map(WritableComparable key,
- Writable val,
- OutputCollector out,
- Reporter reporter) throws IOException {
- long nSamples = ((LongWritable) key).get();
- for(long idx = 0; idx < nSamples; idx++) {
- double x = r.nextDouble();
- double y = r.nextDouble();
- double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);
- if (d > 0.25) {
- numOutside++;
- } else {
- numInside++;
- }
- if (idx%1000 == 1) {
- reporter.setStatus("Generated "+idx+" samples.");
- }
+ Writable val,
+ OutputCollector out,
+ Reporter reporter) throws IOException {
+ long nSamples = ((LongWritable) key).get();
+ for(long idx = 0; idx < nSamples; idx++) {
+ double x = r.nextDouble();
+ double y = r.nextDouble();
+ double d = (x-0.5)*(x-0.5)+(y-0.5)*(y-0.5);
+ if (d > 0.25) {
+ numOutside++;
+ } else {
+ numInside++;
}
- out.collect(new LongWritable(0), new LongWritable(numOutside));
- out.collect(new LongWritable(1), new LongWritable(numInside));
+ if (idx%1000 == 1) {
+ reporter.setStatus("Generated "+idx+" samples.");
+ }
+ }
+ out.collect(new LongWritable(0), new LongWritable(numOutside));
+ out.collect(new LongWritable(1), new LongWritable(numInside));
}
public void close() {
@@ -90,50 +90,50 @@
}
public static class PiReducer extends MapReduceBase implements Reducer {
- long numInside = 0;
- long numOutside = 0;
- JobConf conf;
+ long numInside = 0;
+ long numOutside = 0;
+ JobConf conf;
- /** Reducer configuration.
- *
- */
- public void configure(JobConf job) {
- conf = job;
- }
- /** Reduce method.
- * @param key
- * @param values
- * @param output
- * @param reporter
- */
- public void reduce(WritableComparable key,
- Iterator values,
- OutputCollector output,
- Reporter reporter) throws IOException {
- if (((LongWritable)key).get() == 1) {
- while (values.hasNext()) {
- long num = ((LongWritable)values.next()).get();
- numInside += num;
- }
- } else {
- while (values.hasNext()) {
- long num = ((LongWritable)values.next()).get();
- numOutside += num;
- }
- }
+ /** Reducer configuration.
+ *
+ */
+ public void configure(JobConf job) {
+ conf = job;
+ }
+ /** Reduce method.
+ * @param key
+ * @param values
+ * @param output
+ * @param reporter
+ */
+ public void reduce(WritableComparable key,
+ Iterator values,
+ OutputCollector output,
+ Reporter reporter) throws IOException {
+ if (((LongWritable)key).get() == 1) {
+ while (values.hasNext()) {
+ long num = ((LongWritable)values.next()).get();
+ numInside += num;
+ }
+ } else {
+ while (values.hasNext()) {
+ long num = ((LongWritable)values.next()).get();
+ numOutside += num;
+ }
}
+ }
- public void close() throws IOException {
- Path tmpDir = new Path("test-mini-mr");
- Path outDir = new Path(tmpDir, "out");
- Path outFile = new Path(outDir, "reduce-out");
- FileSystem fileSys = FileSystem.get(conf);
- SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
- outFile, LongWritable.class, LongWritable.class,
- CompressionType.NONE);
- writer.append(new LongWritable(numInside), new LongWritable(numOutside));
- writer.close();
- }
+ public void close() throws IOException {
+ Path tmpDir = new Path("test-mini-mr");
+ Path outDir = new Path(tmpDir, "out");
+ Path outFile = new Path(outDir, "reduce-out");
+ FileSystem fileSys = FileSystem.get(conf);
+ SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, conf,
+ outFile, LongWritable.class, LongWritable.class,
+ CompressionType.NONE);
+ writer.append(new LongWritable(numInside), new LongWritable(numOutside));
+ writer.close();
+ }
}
/**
@@ -141,7 +141,7 @@
* monte-carlo method.
*/
static double launch(int numMaps, long numPoints, String jt, String dfs)
- throws IOException {
+ throws IOException {
Configuration conf = new Configuration();
JobConf jobConf = new JobConf(conf, PiEstimator.class);
@@ -180,7 +180,7 @@
for(int idx=0; idx < numMaps; ++idx) {
Path file = new Path(inDir, "part"+idx);
SequenceFile.Writer writer = SequenceFile.createWriter(fileSys, jobConf,
- file, LongWritable.class, LongWritable.class, CompressionType.NONE);
+ file, LongWritable.class, LongWritable.class, CompressionType.NONE);
writer.append(new LongWritable(numPoints), new LongWritable(0));
writer.close();
System.out.println("Wrote input for Map #"+idx);
@@ -193,10 +193,10 @@
long startTime = System.currentTimeMillis();
JobClient.runJob(jobConf);
System.out.println("Job Finished in "+
- (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+ (double)(System.currentTimeMillis() - startTime)/1000.0 + " seconds");
Path inFile = new Path(outDir, "reduce-out");
SequenceFile.Reader reader = new SequenceFile.Reader(fileSys, inFile,
- jobConf);
+ jobConf);
LongWritable numInside = new LongWritable();
LongWritable numOutside = new LongWritable();
reader.next(numInside, numOutside);
@@ -210,20 +210,20 @@
}
/**
- * Launches all the tasks in order.
- */
- public static void main(String[] argv) throws Exception {
- if (argv.length < 2) {
- System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
- return;
- }
+ * Launches all the tasks in order.
+ */
+ public static void main(String[] argv) throws Exception {
+ if (argv.length < 2) {
+ System.err.println("Usage: TestMiniMR <nMaps> <nSamples>");
+ return;
+ }
- int nMaps = Integer.parseInt(argv[0]);
- long nSamples = Long.parseLong(argv[1]);
+ int nMaps = Integer.parseInt(argv[0]);
+ long nSamples = Long.parseLong(argv[1]);
- System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
+ System.out.println("Number of Maps = "+nMaps+" Samples per Map = "+nSamples);
- System.out.println("Estimated value of PI is "+
- launch(nMaps, nSamples, null, null));
- }
+ System.out.println("Estimated value of PI is "+
+ launch(nMaps, nSamples, null, null));
+ }
}
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/RandomWriter.java Mon Apr 16 14:44:35 2007
@@ -135,16 +135,16 @@
String filename = ((Text) key).toString();
SequenceFile.Writer writer =
SequenceFile.createWriter(fileSys, jobConf, new Path(filename),
- BytesWritable.class, BytesWritable.class,
- CompressionType.NONE, reporter);
+ BytesWritable.class, BytesWritable.class,
+ CompressionType.NONE, reporter);
int itemCount = 0;
while (numBytesToWrite > 0) {
int keyLength = minKeySize +
- (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
+ (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
randomKey.setSize(keyLength);
randomizeBytes(randomKey.get(), 0, randomKey.getSize());
int valueLength = minValueSize +
- (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
+ (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
randomValue.setSize(valueLength);
randomizeBytes(randomValue.get(), 0, randomValue.getSize());
writer.append(randomKey, randomValue);
@@ -158,7 +158,7 @@
}
reporter.setStatus("done with " + itemCount + " records.");
writer.close();
- }
+ }
/**
* Save the values out of the configuaration that we need to write
@@ -172,7 +172,7 @@
throw new RuntimeException("Can't get default file system", e);
}
numBytesToWrite = job.getLong("test.randomwrite.bytes_per_map",
- 1*1024*1024*1024);
+ 1*1024*1024*1024);
minKeySize = job.getInt("test.randomwrite.min_key", 10);
keySizeRange =
job.getInt("test.randomwrite.max_key", 1000) - minKeySize;
@@ -220,7 +220,7 @@
JobClient client = new JobClient(job);
ClusterStatus cluster = client.getClusterStatus();
int numMaps = cluster.getTaskTrackers() *
- job.getInt("test.randomwriter.maps_per_host", 10);
+ job.getInt("test.randomwriter.maps_per_host", 10);
job.setNumMapTasks(numMaps);
System.out.println("Running " + numMaps + " maps.");
job.setNumReduceTasks(1);
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/Sort.java Mon Apr 16 14:44:35 2007
@@ -67,9 +67,9 @@
JobClient client = new JobClient(jobConf);
ClusterStatus cluster = client.getClusterStatus();
int num_maps = cluster.getTaskTrackers() *
- jobConf.getInt("test.sort.maps_per_host", 10);
+ jobConf.getInt("test.sort.maps_per_host", 10);
int num_reduces = cluster.getTaskTrackers() *
- jobConf.getInt("test.sort.reduces_per_host", cluster.getMaxTasks());
+ jobConf.getInt("test.sort.reduces_per_host", cluster.getMaxTasks());
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
try {
@@ -96,7 +96,7 @@
// Make sure there are exactly 2 parameters left.
if (otherArgs.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
- otherArgs.size() + " instead of 2.");
+ otherArgs.size() + " instead of 2.");
printUsage();
}
jobConf.setInputPath(new Path((String) otherArgs.get(0)));
@@ -106,17 +106,17 @@
//job_conf.set("mapred.job.tracker", "local");
System.out.println("Running on " +
- cluster.getTaskTrackers() +
- " nodes to sort from " +
- jobConf.getInputPaths()[0] + " into " +
- jobConf.getOutputPath() + " with " + num_reduces + " reduces.");
+ cluster.getTaskTrackers() +
+ " nodes to sort from " +
+ jobConf.getInputPaths()[0] + " into " +
+ jobConf.getOutputPath() + " with " + num_reduces + " reduces.");
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
- (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
+ (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
}
}
Modified: lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java (original)
+++ lucene/hadoop/trunk/src/examples/org/apache/hadoop/examples/WordCount.java Mon Apr 16 14:44:35 2007
@@ -61,8 +61,8 @@
private Text word = new Text();
public void map(WritableComparable key, Writable value,
- OutputCollector output,
- Reporter reporter) throws IOException {
+ OutputCollector output,
+ Reporter reporter) throws IOException {
String line = ((Text)value).toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
@@ -79,8 +79,8 @@
public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(WritableComparable key, Iterator values,
- OutputCollector output,
- Reporter reporter) throws IOException {
+ OutputCollector output,
+ Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += ((IntWritable) values.next()).get();
@@ -136,7 +136,7 @@
// Make sure there are exactly 2 parameters left.
if (other_args.size() != 2) {
System.out.println("ERROR: Wrong number of parameters: " +
- other_args.size() + " instead of 2.");
+ other_args.size() + " instead of 2.");
printUsage();
}
conf.setInputPath(new Path((String) other_args.get(0)));
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Block.java Mon Apr 16 14:44:35 2007
@@ -29,112 +29,112 @@
**************************************************/
class Block implements Writable, Comparable {
- static { // register a ctor
- WritableFactories.setFactory
- (Block.class,
- new WritableFactory() {
- public Writable newInstance() { return new Block(); }
- });
- }
-
- /**
- */
- public static boolean isBlockFilename(File f) {
- if (f.getName().startsWith("blk_")) {
- return true;
- } else {
- return false;
- }
- }
-
- long blkid;
- long len;
-
- /**
- */
- public Block() {
- this.blkid = 0;
- this.len = 0;
- }
-
- /**
- */
- public Block(long blkid, long len) {
- this.blkid = blkid;
- this.len = len;
- }
-
- /**
- * Find the blockid from the given filename
- */
- public Block(File f, long len) {
- String name = f.getName();
- name = name.substring("blk_".length());
- this.blkid = Long.parseLong(name);
- this.len = len;
- }
-
- /**
- */
- public long getBlockId() {
- return blkid;
- }
-
- /**
- */
- public String getBlockName() {
- return "blk_" + String.valueOf(blkid);
- }
-
- /**
- */
- public long getNumBytes() {
- return len;
- }
- public void setNumBytes(long len) {
- this.len = len;
- }
-
- /**
- */
- public String toString() {
- return getBlockName();
- }
-
- /////////////////////////////////////
- // Writable
- /////////////////////////////////////
- public void write(DataOutput out) throws IOException {
- out.writeLong(blkid);
- out.writeLong(len);
- }
-
- public void readFields(DataInput in) throws IOException {
- this.blkid = in.readLong();
- this.len = in.readLong();
- if( len < 0 ) {
- throw new IOException("Unexpected block size: " + len);
- }
- }
-
- /////////////////////////////////////
- // Comparable
- /////////////////////////////////////
- public int compareTo(Object o) {
- Block b = (Block) o;
- if ( blkid < b.blkid ) {
- return -1;
- } else if ( blkid == b.blkid ) {
- return 0;
- } else {
- return 1;
- }
- }
- public boolean equals(Object o) {
- return blkid == ((Block)o).blkid;
- }
+ static { // register a ctor
+ WritableFactories.setFactory
+ (Block.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new Block(); }
+ });
+ }
+
+ /**
+ */
+ public static boolean isBlockFilename(File f) {
+ if (f.getName().startsWith("blk_")) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ long blkid;
+ long len;
+
+ /**
+ */
+ public Block() {
+ this.blkid = 0;
+ this.len = 0;
+ }
+
+ /**
+ */
+ public Block(long blkid, long len) {
+ this.blkid = blkid;
+ this.len = len;
+ }
+
+ /**
+ * Find the blockid from the given filename
+ */
+ public Block(File f, long len) {
+ String name = f.getName();
+ name = name.substring("blk_".length());
+ this.blkid = Long.parseLong(name);
+ this.len = len;
+ }
+
+ /**
+ */
+ public long getBlockId() {
+ return blkid;
+ }
+
+ /**
+ */
+ public String getBlockName() {
+ return "blk_" + String.valueOf(blkid);
+ }
+
+ /**
+ */
+ public long getNumBytes() {
+ return len;
+ }
+ public void setNumBytes(long len) {
+ this.len = len;
+ }
+
+ /**
+ */
+ public String toString() {
+ return getBlockName();
+ }
+
+ /////////////////////////////////////
+ // Writable
+ /////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ out.writeLong(blkid);
+ out.writeLong(len);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.blkid = in.readLong();
+ this.len = in.readLong();
+ if( len < 0 ) {
+ throw new IOException("Unexpected block size: " + len);
+ }
+ }
+
+ /////////////////////////////////////
+ // Comparable
+ /////////////////////////////////////
+ public int compareTo(Object o) {
+ Block b = (Block) o;
+ if ( blkid < b.blkid ) {
+ return -1;
+ } else if ( blkid == b.blkid ) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ public boolean equals(Object o) {
+ return blkid == ((Block)o).blkid;
+ }
- public int hashCode() {
- return 37 * 17 + (int) (blkid^(blkid>>>32));
- }
+ public int hashCode() {
+ return 37 * 17 + (int) (blkid^(blkid>>>32));
+ }
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?view=diff&rev=529410&r1=529409&r2=529410
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Mon Apr 16 14:44:35 2007
@@ -48,7 +48,7 @@
public void readFields(DataInput in) throws IOException {
this.action = (DatanodeProtocol.DataNodeAction)
- WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
+ WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
}
}