You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2010/04/01 13:05:27 UTC
svn commit: r929923 - in /lucene/mahout/trunk:
core/src/main/java/org/apache/mahout/cf/taste/hadoop/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/
core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/
core/src/main/java/org/apache/m...
Author: srowen
Date: Thu Apr 1 11:05:27 2010
New Revision: 929923
URL: http://svn.apache.org/viewvc?rev=929923&view=rev
Log:
First stab at MAHOUT-350 et al: overhaul AbstractJob and some argument handling for Hadoop
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/AbstractJob.java Thu Apr 1 11:05:27 2010
@@ -46,6 +46,28 @@ import org.apache.mahout.common.commandl
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * <p>Superclass of many Mahout Hadoop "jobs". A job drives configuration and launch of one or
+ * more maps and reduces in order to accomplish some task.</p>
+ *
+ * <p>Command line arguments available to all subclasses are:</p>
+ *
+ * <ul>
+ * <li>--tempDir (path): Specifies a directory where the job may place temp files
+ * (default "temp")</li>
+ * <li>--help: Show help message</li>
+ * </ul>
+ *
+ * <p>In addition, note some key command line parameters that are parsed by Hadoop, which jobs
+ * may need to set:</p>
+ *
+ * <ul>
+ * <li>-Dmapred.job.name=(name): Sets the Hadoop task names. It will be suffixed by the Job class name</li>
+ * <li>-Dmapred.output.compress={true,false}: Compress final output (default true)</li>
+ * <li>-Dmapred.input.dir=(path): input file, or directory containing input files (required)</li>
+ * <li>-Dmapred.output.dir=(path): path to write output files (required)</li>
+ * </ul>
+ */
public abstract class AbstractJob extends Configured implements Tool {
private static final Logger log = LoggerFactory.getLogger(AbstractJob.class);
@@ -74,13 +96,12 @@ public abstract class AbstractJob extend
protected static Map<String,String> parseArguments(String[] args, Option... extraOpts) {
- Option inputOpt = DefaultOptionCreator.inputOption().create();
Option tempDirOpt = buildOption("tempDir", "t", "Intermediate output directory", "temp");
- Option outputOpt = DefaultOptionCreator.outputOption().create();
Option helpOpt = DefaultOptionCreator.helpOption();
- GroupBuilder gBuilder = new GroupBuilder().withName("Options").withOption(inputOpt)
- .withOption(tempDirOpt).withOption(outputOpt).withOption(helpOpt);
+ GroupBuilder gBuilder = new GroupBuilder().withName("Options")
+ .withOption(tempDirOpt)
+ .withOption(helpOpt);
for (Option opt : extraOpts) {
gBuilder = gBuilder.withOption(opt);
@@ -105,7 +126,7 @@ public abstract class AbstractJob extend
}
Map<String,String> result = new HashMap<String,String>();
- maybePut(result, cmdLine, inputOpt, tempDirOpt, outputOpt, helpOpt);
+ maybePut(result, cmdLine, tempDirOpt, helpOpt);
maybePut(result, cmdLine, extraOpts);
return result;
@@ -131,13 +152,14 @@ public abstract class AbstractJob extend
Class<? extends Writable> reducerValue,
Class<? extends OutputFormat> outputFormat) throws IOException {
- JobConf jobConf = new JobConf(getConf(), mapper);
+ JobConf jobConf = new JobConf(getConf(), getClass());
FileSystem fs = FileSystem.get(jobConf);
Path inputPathPath = new Path(inputPath).makeQualified(fs);
Path outputPathPath = new Path(outputPath).makeQualified(fs);
jobConf.setClass("mapred.input.format.class", inputFormat, InputFormat.class);
+ // Override this:
jobConf.set("mapred.input.dir", StringUtils.escapeString(inputPathPath.toString()));
jobConf.setClass("mapred.mapper.class", mapper, Mapper.class);
@@ -147,9 +169,17 @@ public abstract class AbstractJob extend
jobConf.setClass("mapred.reducer.class", reducer, Reducer.class);
jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class);
jobConf.setClass("mapred.output.value.class", reducerValue, Writable.class);
- jobConf.setBoolean("mapred.output.compress", true);
-
+ if (jobConf.get("mapred.output.compress") == null) {
+ jobConf.setBoolean("mapred.output.compress", true);
+ // otherwise leave it to its default value
+ }
+ String customJobName = jobConf.get("mapred.job.name");
+ if (customJobName != null) {
+ jobConf.set("mapred.job.name", customJobName + '-' + getClass().getSimpleName());
+ }
+
jobConf.setClass("mapred.output.format.class", outputFormat, OutputFormat.class);
+ // Override this:
jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString()));
return jobConf;
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java Thu Apr 1 11:05:27 2010
@@ -42,23 +42,19 @@ import org.apache.mahout.cf.taste.hadoop
import org.apache.mahout.math.VectorWritable;
/**
- * Runs a completely distributed recommender job as a series of mapreduces.</p>
+ * <p>Runs a completely distributed recommender job as a series of mapreduces.</p>
*
- * <p>
- * Command line arguments are:
- * </p>
+ * <p>Command line arguments specific to this class are:</p>
*
* <ol>
- * <li>numRecommendations: Number of recommendations to compute per user (optional; default 10)</li>
- * <li>input: Directory containing a text file containing user IDs for which recommendations should be
+ * <li>--numRecommendations (integer): Number of recommendations to compute per user (optional; default 10)</li>
+ * <li>-Dmapred.input.dir=(path): Directory containing a text file containing user IDs for which recommendations should be
* computed, one per line</li>
- * <li>output: output path where recommender output should go</li>
- * <li>jarFile: JAR file containing implementation code</li>
- * <li>tempDir: directory in which to place intermediate data files (optional; default "temp")</li>
- * <li>usersFile: file containing user IDs to recommend for (optional)</li>
+ * <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
+ * <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
* </ol>
- *
- * @see org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
*/
public final class RecommenderJob extends AbstractJob {
@@ -74,9 +70,10 @@ public final class RecommenderJob extend
return -1;
}
- String inputPath = parsedArgs.get("--input");
+ Configuration originalConf = getConf();
+ String inputPath = originalConf.get("mapred.input.dir");
+ String outputPath = originalConf.get("mapred.output.dir");
String tempDirPath = parsedArgs.get("--tempDir");
- String outputPath = parsedArgs.get("--output");
int recommendationsPerUser = Integer.parseInt(parsedArgs.get("--numRecommendations"));
String usersFile = parsedArgs.get("--usersFile");
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/pseudo/RecommenderJob.java Thu Apr 1 11:05:27 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Map;
import org.apache.commons.cli2.Option;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -40,25 +41,22 @@ import org.apache.mahout.cf.taste.recomm
* {@link Recommender} instances on Hadoop, where each instance is a normal non-distributed implementation.
* </p>
*
- * <p>
- * This class configures and runs a {@link RecommenderReducer} using Hadoop.
- * </p>
+ * <p>This class configures and runs a {@link RecommenderReducer} using Hadoop.</p>
*
- * <p>
- * Command line arguments are:
- * </p>
+ * <p>Command line arguments specific to this class are:</p>
*
* <ol>
- * <li>recommenderClassName: Fully-qualified class name of {@link Recommender} to use to make recommendations.
+ * <li>--recommenderClassName (string): Fully-qualified class name of {@link Recommender} to use to make recommendations.
* Note that it must have a constructor which takes a {@link org.apache.mahout.cf.taste.model.DataModel}
* argument.</li>
- * <li>numRecommendations: Number of recommendations to compute per user</li>
- * <li>input: Location of a data model file containing preference data, suitable for use with
+ * <li>--numRecommendations (integer): Number of recommendations to compute per user</li>
+ * <li>-Dmapred.input.dir=(path): Location of a data model file containing preference data, suitable for use with
* {@link org.apache.mahout.cf.taste.impl.model.file.FileDataModel}</li>
- * <li>output: output path where recommender output should go</li>
- * <li>jarFile: JAR file containing implementation code</li>
- * <li>usersFile: file containing user IDs to recommend for (optional)</li>
+ * <li>-Dmapred.output.dir=(path): output path where recommender output should go</li>
+ * <li>--usersFile (path): file containing user IDs to recommend for (optional)</li>
* </ol>
+ *
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
*
* <p>
* For example, to get started trying this out, set up Hadoop in a pseudo-distributed manner:
@@ -92,8 +90,8 @@ import org.apache.mahout.cf.taste.recomm
* </p>
*
* {@code hadoop jar recommender.jar org.apache.mahout.cf.taste.hadoop.pseudo.RecommenderJob \
- * --recommenderClassName your.project.Recommender \ --numRecommendations 10 --input input/users.csv \
- * --output output --jarFile recommender.jar * }
+ * --recommenderClassName your.project.Recommender \ --numRecommendations 10 -Dmapred.input.dir=input/users.csv \
+ * -Dmapred.output.dir=output * }
*/
public final class RecommenderJob extends AbstractJob {
@@ -112,8 +110,10 @@ public final class RecommenderJob extend
if (parsedArgs == null) {
return -1;
}
- String inputFile = parsedArgs.get("--input");
- String outputPath = parsedArgs.get("--output");
+
+ Configuration originalConf = getConf();
+ String inputFile = originalConf.get("mapred.input.dir");
+ String outputPath = originalConf.get("mapred.output.dir");
String usersFile = parsedArgs.get("--usersFile");
if (usersFile == null) {
usersFile = inputFile;
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java Thu Apr 1 11:05:27 2010
@@ -20,6 +20,7 @@ package org.apache.mahout.cf.taste.hadoo
import java.io.IOException;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -47,8 +48,9 @@ public final class SlopeOneAverageDiffsJ
return -1;
}
- String prefsFile = parsedArgs.get("--input");
- String outputPath = parsedArgs.get("--output");
+ Configuration originalConf = getConf();
+ String prefsFile = originalConf.get("mapred.input.dir");
+ String outputPath = originalConf.get("mapred.output.dir");
String averagesOutputPath = parsedArgs.get("--tempDir");
JobConf prefsToDiffsJobConf = prepareJobConf(prefsFile, averagesOutputPath,
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/TransposeJob.java Thu Apr 1 11:05:27 2010
@@ -1,6 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.mahout.math.hadoop;
import org.apache.commons.cli2.Option;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -45,7 +63,8 @@ public class TransposeJob extends Abstra
"Number of columns of the input matrix");
Map<String,String> parsedArgs = parseArguments(strings, numRowsOpt, numColsOpt);
- String inputPathString = parsedArgs.get("--input");
+ Configuration originalConf = getConf();
+ String inputPathString = originalConf.get("mapred.input.dir");
String outputTmpPathString = parsedArgs.get("--tempDir");
int numRows = Integer.parseInt(parsedArgs.get("--numRows"));
int numCols = Integer.parseInt(parsedArgs.get("--numCols"));
@@ -105,12 +124,12 @@ public class TransposeJob extends Abstra
public static class TransposeReducer extends MapReduceBase
implements Reducer<IntWritable,DistributedRowMatrix.MatrixEntryWritable,IntWritable,VectorWritable> {
- private JobConf conf;
+ //private JobConf conf;
private int newNumCols;
@Override
public void configure(JobConf conf) {
- this.conf = conf;
+ //this.conf = conf;
newNumCols = conf.getInt(NUM_ROWS_KEY, Integer.MAX_VALUE);
}
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/DistributedLanczosSolver.java Thu Apr 1 11:05:27 2010
@@ -67,7 +67,8 @@ public class DistributedLanczosSolver ex
@Override
public int run(String[] strings) throws Exception {
- String inputPathString = parsedArgs.get("--input");
+ Configuration originalConfig = getConf();
+ String inputPathString = originalConfig.get("mapred.input.dir");
String outputTmpPathString = parsedArgs.get("--tempDir");
int numRows = Integer.parseInt(parsedArgs.get("--numRows"));
int numCols = Integer.parseInt(parsedArgs.get("--numCols"));
@@ -75,13 +76,13 @@ public class DistributedLanczosSolver ex
int desiredRank = Integer.parseInt(parsedArgs.get("--rank"));
Matrix eigenVectors = new DenseMatrix(desiredRank, numCols);
List<Double> eigenValues = new ArrayList<Double>();
- String outputEigenVectorPath = parsedArgs.get("--output");
+ String outputEigenVectorPath = originalConfig.get("mapred.output.dir");
DistributedRowMatrix matrix = new DistributedRowMatrix(inputPathString,
outputTmpPathString,
numRows,
numCols);
- matrix.configure(new JobConf(getConf()));
+ matrix.configure(new JobConf(originalConfig));
solve(matrix, desiredRank, eigenVectors, eigenValues, isSymmetric);
serializeOutput(eigenVectors, eigenValues, outputEigenVectorPath);
@@ -96,7 +97,7 @@ public class DistributedLanczosSolver ex
* @throws IOException
*/
public void serializeOutput(Matrix eigenVectors, List<Double> eigenValues, String outputPath) throws IOException {
- log.info("Persisting " + eigenVectors.numRows() + " eigenVectors and eigenValues to: " + outputPath);
+ log.info("Persisting {} eigenVectors and eigenValues to: {}", eigenVectors.numRows(), outputPath);
Path path = new Path(outputPath);
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/decomposer/EigenVerificationJob.java Thu Apr 1 11:05:27 2010
@@ -102,7 +102,8 @@ public class EigenVerificationJob extend
} else if (argMap.isEmpty()) {
return 0;
}
- outPath = argMap.get("--output");
+ Configuration originalConf = getConf();
+ outPath = originalConf.get("mapred.output.class");
tmpOut = outPath + "/tmp";
if(argMap.get("--eigenInput") != null && eigensToVerify == null) {
Modified: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java (original)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java Thu Apr 1 11:05:27 2010
@@ -17,6 +17,7 @@
package org.apache.mahout.text;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -31,16 +32,14 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.mahout.cf.taste.hadoop.AbstractJob;
import java.io.IOException;
-import java.util.Map;
-
public class TextParagraphSplittingJob extends AbstractJob {
@Override
public int run(String[] strings) throws Exception {
- Map<String,String> args = parseArguments(strings);
- JobConf conf = prepareJobConf(args.get("--input"),
- args.get("--output"),
+ Configuration originalConf = getConf();
+ JobConf conf = prepareJobConf(originalConf.get("mapred.input.dir"),
+ originalConf.get("mapred.output.dir"),
SequenceFileInputFormat.class,
SplitMap.class,
Text.class,
Modified: lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java?rev=929923&r1=929922&r2=929923&view=diff
==============================================================================
--- lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java (original)
+++ lucene/mahout/trunk/utils/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java Thu Apr 1 11:05:27 2010
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.mahout.utils.vectors;
import org.apache.hadoop.conf.Configuration;
@@ -13,18 +30,15 @@ import org.apache.mahout.math.VectorWrit
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-
public class RowIdJob extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger(RowIdJob.class);
@Override
public int run(String[] strings) throws Exception {
- Map<String,String> parsedArgs = parseArguments(strings);
Configuration conf = getConf();
FileSystem fs = FileSystem.get(conf);
- Path inputPath = fs.makeQualified(new Path(parsedArgs.get("--input")));
- Path outputPath = fs.makeQualified(new Path(parsedArgs.get("--output")));
+ Path inputPath = fs.makeQualified(new Path(conf.get("mapred.input.dir")));
+ Path outputPath = fs.makeQualified(new Path(conf.get("mapred.output.dir")));
Path indexPath = new Path(outputPath, "docIndex");
Path matrixPath = new Path(outputPath, "matrix");
SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs,