You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/06/24 07:35:28 UTC

svn commit: r1495927 - in /mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/common/iterator/ integration/src/main/java/org/apache/mahout/text/ integration/src/test/java/org/apache/mahout/text/

Author: smarthi
Date: Mon Jun 24 05:35:27 2013
New Revision: 1495927

URL: http://svn.apache.org/r1495927
Log:
MAHOUT-833: Make conversion to sequence files map-reduce - first round of Code cleanup based on feedback from code review

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Mon Jun 24 05:35:27 2013
@@ -36,7 +36,6 @@ import org.apache.commons.cli2.builder.G
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -117,34 +116,6 @@ public abstract class AbstractJob extend
     options = Lists.newLinkedList();
   }
 
-  /**
-   * Builds a comma-separated list of input splits
-   */
-  public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
-    StringBuilder dirList = new StringBuilder();
-    boolean bContainsFiles = false;
-
-    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
-      if (childFileStatus.isDir()) {
-        String subDirectoryList = buildDirList(fs, childFileStatus);
-        if (subDirectoryList.length() > 0 && dirList.length() > 0) {
-          dirList.append(",");
-        }
-        dirList.append(subDirectoryList);
-      } else {
-        bContainsFiles = true;
-      }
-    }
-
-    if (bContainsFiles) {
-      if (dirList.length() > 0) {
-        dirList.append(",");
-      }
-      dirList.append(fileStatus.getPath().toUri().getPath());
-    }
-    return dirList.toString();
-  }
-
   /** Returns the input path established by a call to {@link #parseArguments(String[])}.
    *  The source of the path may be an input option added using {@link #addInputOption()}
    *  or it may be the value of the {@code mapred.input.dir} configuration

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java Mon Jun 24 05:35:27 2013
@@ -340,4 +340,59 @@ public final class HadoopUtil {
       Closeables.close(in, true);
     }
   }
+
+  /**
+   * Builds a comma-separated list of input splits
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
+    StringBuilder dirList = new StringBuilder();
+    boolean bContainsFiles = false;
+
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        if (subDirectoryList.length() > 0 && dirList.length() > 0) {
+          dirList.append(",");
+        }
+        dirList.append(subDirectoryList);
+      } else {
+        bContainsFiles = true;
+      }
+    }
+
+    if (bContainsFiles) {
+      if (dirList.length() > 0) {
+        dirList.append(",");
+      }
+      dirList.append(fileStatus.getPath().toUri().getPath());
+    }
+    return dirList.toString();
+  }
+
+  /**
+   *
+   * @param conf  -  configuration
+   * @param filePath - Input File Path
+   * @return relative file Path
+   * @throws IOException
+   */
+  public static String calcRelativeFilePath(Configuration conf, Path filePath) throws IOException {
+    FileSystem fs = filePath.getFileSystem(conf);
+    FileStatus fst = fs.getFileStatus(filePath);
+    String currentPath = fst.getPath().toString().replaceFirst("file:", "");
+
+    String basePath = conf.get("baseinputpath");
+    if (!basePath.endsWith("/")) {
+      basePath += "/";
+    }
+    basePath = basePath.replaceFirst("file:", "");
+    String[] parts = currentPath.split(basePath);
+
+    if (parts.length == 2) {
+      return parts[1];
+    } else if (parts.length == 1) {
+      return parts[0];
+    }
+    return currentPath;
+  }
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java Mon Jun 24 05:35:27 2013
@@ -31,10 +31,7 @@ import com.google.common.base.Charsets;
  * defines lines.
  * 
  * This class will uncompress files that end in .zip or .gz accordingly, too.
- * 
- * In case that you pass the class a stream of compressed bytes, simply pass the
- * original file name as well in the constructor and the system will decompress the bytes.
- * 
+ *
  */
 public final class FileLineIterable implements Iterable<String> {
 

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Mon Jun 24 05:35:27 2013
@@ -133,8 +133,8 @@ public class SequenceFilesFromDirectory 
     Configuration jobConfig = job.getConfiguration();
     jobConfig.set("keyPrefix", keyPrefix);
     FileSystem fs = FileSystem.get(jobConfig);
-    FileStatus fsFileStatus = fs.getFileStatus(input);
-    String inputDirList = buildDirList(fs, fsFileStatus);
+    FileStatus fsFileStatus = HadoopUtil.listStatus(fs, input)[0];
+    String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
     jobConfig.set("baseinputpath", input.toString());
 
     long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java Mon Jun 24 05:35:27 2013
@@ -17,6 +17,8 @@
 
 package org.apache.mahout.text;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
@@ -24,8 +26,7 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-import java.io.IOException;
+import org.apache.mahout.common.HadoopUtil;
 
 /**
  * Map class for SequenceFilesFromDirectory MR job
@@ -46,7 +47,7 @@ public class SequenceFilesFromDirectoryM
 
     Configuration configuration = context.getConfiguration();
     Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
-    String relativeFilePath = SequenceFilesFromMailArchivesMapper.calcRelativeFilePath(configuration, filePath);
+    String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
 
     String filename = this.keyPrefix.length() > 0 ?
       this.keyPrefix + Path.SEPARATOR + relativeFilePath :

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java Mon Jun 24 05:35:27 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.utils.email.MailOptions;
 import org.apache.mahout.utils.email.MailProcessor;
@@ -322,10 +323,10 @@ public final class SequenceFilesFromMail
     }
 
     FileSystem fs = FileSystem.get(jobConfig);
-    FileStatus fsFileStatus = fs.getFileStatus(inputPath);
+    FileStatus fsFileStatus = HadoopUtil.listStatus(fs, inputPath)[0];
 
     jobConfig.set("baseinputpath", inputPath.toString());
-    String inputDirList = buildDirList(fs, fsFileStatus);
+    String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
     FileInputFormat.setInputPaths(job, inputDirList);
 
     long chunkSizeInBytes = chunkSize * 1024 * 1024;

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java Mon Jun 24 05:35:27 2013
@@ -34,11 +34,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.iterator.FileLineIterable;
 import org.apache.mahout.utils.email.MailOptions;
 import org.apache.mahout.utils.email.MailProcessor;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
@@ -230,33 +229,12 @@ public class SequenceFilesFromMailArchiv
     }
     contents.append(body);
   }
-  
-  public static String calcRelativeFilePath(Configuration conf, Path filePath) throws IOException {
-    FileSystem fs = filePath.getFileSystem(conf);
-    FileStatus fst = fs.getFileStatus(filePath);
-    String currentPath = fst.getPath().toString().replaceFirst("file:", "");
-
-    String basePath = conf.get("baseinputpath");
-    if (!basePath.endsWith("/")) {
-      basePath += "/";
-    }
-    basePath = basePath.replaceFirst("file:", "");
-    String[] parts = currentPath.split(basePath);
-
-    String hdfsStuffRemoved = currentPath; // default value
-    if (parts.length == 2) {
-      hdfsStuffRemoved = parts[1];
-    } else if (parts.length == 1) {
-      hdfsStuffRemoved = parts[0];
-    }
-    return hdfsStuffRemoved;
-  }
 
   public void map(IntWritable key, BytesWritable value, Context context)
     throws IOException, InterruptedException {
     Configuration configuration = context.getConfiguration();
     Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
-    String relativeFilePath = calcRelativeFilePath(configuration, filePath);
+    String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
     ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
     parseMboxLineByLine(relativeFilePath, is, context);
   }

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1495927&r1=1495926&r2=1495927&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Mon Jun 24 05:35:27 2013
@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
-import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
 import org.apache.mahout.utils.MahoutTestCase;
@@ -90,7 +90,7 @@ public final class TestSequenceFilesFrom
     createRecursiveDirFilesFromArrays(conf, inputDirRecursive, DATA2);
 
     FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive);
-    String dirs = AbstractJob.buildDirList(fs, fstInputPath);
+    String dirs = HadoopUtil.buildDirList(fs, fstInputPath);
 
     System.out.println("\n\n ----- recursive dirs: " + dirs);
     SequenceFilesFromDirectory.main(new String[]{
@@ -137,7 +137,7 @@ public final class TestSequenceFilesFrom
     createRecursiveDirFilesFromArrays(conf, inputDirRecur, DATA2);
 
     FileStatus fst_input_path = fs.getFileStatus(inputDirRecur);
-    String dirs = AbstractJob.buildDirList(fs, fst_input_path);
+    String dirs = HadoopUtil.buildDirList(fs, fst_input_path);
 
     logger.info("\n\n ---- recursive dirs: {}", dirs);