You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ad...@apache.org on 2010/01/09 11:02:49 UTC
svn commit: r897405 - in /lucene/mahout/trunk/core/src:
main/java/org/apache/mahout/df/tools/
test/java/org/apache/mahout/df/mapred/partial/
test/java/org/apache/mahout/df/mapreduce/partial/
Author: adeneche
Date: Sat Jan 9 10:02:49 2010
New Revision: 897405
URL: http://svn.apache.org/viewvc?rev=897405&view=rev
Log:
MAHOUT-216
Modified:
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java
lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java
lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java Sat Jan 9 10:02:49 2010
@@ -113,10 +113,10 @@
// compute the partitions' sizes
int numPartitions = counts.length;
- int[] sizes = new int[numPartitions]; // TODO this isn't used?
- for (int p = 0; p < numPartitions; p++) {
- sizes[p] = DataUtils.sum(counts[p]);
- }
+// int[] sizes = new int[numPartitions]; // TODO this isn't used?
+// for (int p = 0; p < numPartitions; p++) {
+// sizes[p] = DataUtils.sum(counts[p]);
+// }
// outputing the frequencies
log.info("counts[partition][class]");
Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java Sat Jan 9 10:02:49 2010
@@ -9,10 +9,7 @@
import org.apache.commons.cli2.builder.GroupBuilder;
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.RandomUtils;
import org.apache.mahout.df.data.DataConverter;
@@ -22,6 +19,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.io.File;
import java.util.Random;
import java.util.Scanner;
@@ -103,14 +101,34 @@
Configuration conf = new Configuration();
- // TODO exception if numPArtitions <= 0
+ if (numPartitions <= 0) {
+ throw new IllegalArgumentException("numPartitions <= 0");
+ }
- // create a new file corresponding to each partition
+ // make sure the output file does not exist
Path outputPath = new Path(output);
FileSystem fs = outputPath.getFileSystem(conf);
+
+ if (fs.exists(outputPath)) {
+ throw new IllegalArgumentException("Output path already exists");
+ }
+
+ // create a new file corresponding to each partition
+// Path workingDir = fs.getWorkingDirectory();
+// FileSystem wfs = workingDir.getFileSystem(conf);
+// File parentFile = new File(workingDir.toString());
+// File tempFile = FileUtil.createLocalTempFile(parentFile, "Parts", true);
+// File tempFile = File.createTempFile("df.tools.UDistrib","");
+// tempFile.deleteOnExit();
+ File tempFile = FileUtil.createLocalTempFile(new File(""), "df.tools.UDistrib", true);
+ Path partsPath = new Path(tempFile.toString());
+ FileSystem pfs = partsPath.getFileSystem(conf);
+
+ Path[] partPaths = new Path[numPartitions];
FSDataOutputStream[] files = new FSDataOutputStream[numPartitions];
for (int p = 0; p < numPartitions; p++) {
- files[p] = fs.create(new Path(outputPath, String.format("part-%03d.data", p)));
+ partPaths[p] = new Path(partsPath, String.format("part.%03d", p));
+ files[p] = pfs.create(partPaths[p]);
}
Path datasetPath = new Path(datasetStr);
@@ -131,11 +149,12 @@
FSDataInputStream input = ifs.open(dataPath);
Scanner scanner = new Scanner(input);
DataConverter converter = new DataConverter(dataset);
+ int nbInstances = dataset.nbInstances();
int id = 0;
while (scanner.hasNextLine()) {
if ((id % 1000)==0) {
- log.info("currentId : " + id);
+ log.info(String.format("progress : %d / %d", id, nbInstances));
}
String line = scanner.nextLine();
@@ -158,6 +177,24 @@
scanner.close();
for (FSDataOutputStream file : files)
file.close();
+
+ // merge all output files
+ FileUtil.copyMerge(pfs, partsPath, fs, outputPath, true, conf, null);
+/*
+ FSDataOutputStream joined = fs.create(new Path(outputPath, "uniform.data"));
+ for (int p = 0; p < numPartitions; p++) {
+ log.info("Joining part : " + p);
+ FSDataInputStream partStream = fs.open(partPaths[p]);
+
+ IOUtils.copyBytes(partStream, joined, conf, false);
+
+ partStream.close();
+ }
+
+ joined.close();
+
+ fs.delete(partsPath, true);
+*/
}
}
Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java Sat Jan 9 10:02:49 2010
@@ -55,6 +55,12 @@
private static final int numMaps = 5;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ RandomUtils.useTestSeed();
+ }
+
public void testStep0Mapper() throws Exception {
Random rng = RandomUtils.getRandom();
Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java Sat Jan 9 10:02:49 2010
@@ -64,6 +64,12 @@
Step0Context context;
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ RandomUtils.useTestSeed();
+ }
+
/**
* Computes the "mapred.max.split.size" that will generate the desired number
* of input splits