You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ad...@apache.org on 2010/01/09 11:02:49 UTC

svn commit: r897405 - in /lucene/mahout/trunk/core/src: main/java/org/apache/mahout/df/tools/ test/java/org/apache/mahout/df/mapred/partial/ test/java/org/apache/mahout/df/mapreduce/partial/

Author: adeneche
Date: Sat Jan  9 10:02:49 2010
New Revision: 897405

URL: http://svn.apache.org/viewvc?rev=897405&view=rev
Log:
MAHOUT-216

Modified:
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java
    lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java
    lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/Frequencies.java Sat Jan  9 10:02:49 2010
@@ -113,10 +113,10 @@
 
     // compute the partitions' sizes
     int numPartitions = counts.length;
-    int[] sizes = new int[numPartitions]; // TODO this isn't used?
-    for (int p = 0; p < numPartitions; p++) {
-      sizes[p] = DataUtils.sum(counts[p]);
-    }
+//    int[] sizes = new int[numPartitions]; // TODO this isn't used?
+//    for (int p = 0; p < numPartitions; p++) {
+//      sizes[p] = DataUtils.sum(counts[p]);
+//    }
 
     // outputing the frequencies
     log.info("counts[partition][class]");

Modified: lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java (original)
+++ lucene/mahout/trunk/core/src/main/java/org/apache/mahout/df/tools/UDistrib.java Sat Jan  9 10:02:49 2010
@@ -9,10 +9,7 @@
 import org.apache.commons.cli2.builder.GroupBuilder;
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.mahout.common.CommandLineUtil;
 import org.apache.mahout.common.RandomUtils;
 import org.apache.mahout.df.data.DataConverter;
@@ -22,6 +19,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.File;
 import java.util.Random;
 import java.util.Scanner;
 
@@ -103,14 +101,34 @@
 
     Configuration conf = new Configuration();
 
-    // TODO exception if numPArtitions <= 0
+    if (numPartitions <= 0) {
+        throw new IllegalArgumentException("numPartitions <= 0");
+    }
 
-    // create a new file corresponding to each partition
+    // make sure the output file does not exist
     Path outputPath = new Path(output);
     FileSystem fs = outputPath.getFileSystem(conf);
+
+    if (fs.exists(outputPath)) {
+        throw new IllegalArgumentException("Output path already exists");
+    }
+
+    // create a new file corresponding to each partition
+//    Path workingDir = fs.getWorkingDirectory();
+//    FileSystem wfs = workingDir.getFileSystem(conf);
+//    File parentFile = new File(workingDir.toString());
+//    File tempFile = FileUtil.createLocalTempFile(parentFile, "Parts", true);
+//    File tempFile = File.createTempFile("df.tools.UDistrib","");
+//    tempFile.deleteOnExit();
+    File tempFile = FileUtil.createLocalTempFile(new File(""), "df.tools.UDistrib", true);
+    Path partsPath = new Path(tempFile.toString());
+    FileSystem pfs = partsPath.getFileSystem(conf);
+
+    Path[] partPaths = new Path[numPartitions];
     FSDataOutputStream[] files = new FSDataOutputStream[numPartitions];
     for (int p = 0; p < numPartitions; p++) {
-      files[p] = fs.create(new Path(outputPath, String.format("part-%03d.data", p)));
+      partPaths[p] = new Path(partsPath, String.format("part.%03d", p));
+      files[p] = pfs.create(partPaths[p]);
     }
 
     Path datasetPath = new Path(datasetStr);
@@ -131,11 +149,12 @@
     FSDataInputStream input = ifs.open(dataPath);
     Scanner scanner = new Scanner(input);
     DataConverter converter = new DataConverter(dataset);
+    int nbInstances = dataset.nbInstances();
 
     int id = 0;
     while (scanner.hasNextLine()) {
       if ((id % 1000)==0) {
-        log.info("currentId : " + id);
+        log.info(String.format("progress : %d / %d", id, nbInstances));
       }
       
       String line = scanner.nextLine();
@@ -158,6 +177,24 @@
     scanner.close();
     for (FSDataOutputStream file : files)
       file.close();
+
+    // merge all output files
+    FileUtil.copyMerge(pfs, partsPath, fs, outputPath, true, conf, null);
+/*
+    FSDataOutputStream joined = fs.create(new Path(outputPath, "uniform.data"));
+    for (int p = 0; p < numPartitions; p++) {
+        log.info("Joining part : " + p);
+        FSDataInputStream partStream = fs.open(partPaths[p]);
+
+        IOUtils.copyBytes(partStream, joined, conf, false);
+
+        partStream.close();
+    }
+
+    joined.close();
+
+    fs.delete(partsPath, true);
+*/
   }
 
 }

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapred/partial/Step0JobTest.java Sat Jan  9 10:02:49 2010
@@ -55,6 +55,12 @@
 
   private static final int numMaps = 5;
 
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    RandomUtils.useTestSeed();
+  }
+
   public void testStep0Mapper() throws Exception {
     Random rng = RandomUtils.getRandom();
 

Modified: lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java
URL: http://svn.apache.org/viewvc/lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java?rev=897405&r1=897404&r2=897405&view=diff
==============================================================================
--- lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java (original)
+++ lucene/mahout/trunk/core/src/test/java/org/apache/mahout/df/mapreduce/partial/Step0JobTest.java Sat Jan  9 10:02:49 2010
@@ -64,6 +64,12 @@
 
   Step0Context context;
 
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    RandomUtils.useTestSeed();
+  }
+
   /**
    * Computes the "mapred.max.split.size" that will generate the desired number
    * of input splits