You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/06/23 20:15:57 UTC

svn commit: r1495863 - in /mahout/trunk: core/src/main/java/org/apache/mahout/common/ core/src/main/java/org/apache/mahout/common/iterator/ integration/src/main/java/org/apache/mahout/text/ integration/src/test/java/org/apache/mahout/text/

Author: smarthi
Date: Sun Jun 23 18:15:56 2013
New Revision: 1495863

URL: http://svn.apache.org/r1495863
Log:
MAHOUT-833: Make conversion to sequence files map-reduce - Checking in, tests pass

Added:
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
    mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Sun Jun 23 18:15:56 2013
@@ -36,6 +36,7 @@ import org.apache.commons.cli2.builder.G
 import org.apache.commons.cli2.commandline.Parser;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
@@ -116,6 +117,34 @@ public abstract class AbstractJob extend
     options = Lists.newLinkedList();
   }
 
+  /**
+   * Builds a comma-separated list of input splits
+   */
+  public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
+    StringBuilder dirList = new StringBuilder();
+    boolean bContainsFiles = false;
+
+    for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
+      if (childFileStatus.isDir()) {
+        String subDirectoryList = buildDirList(fs, childFileStatus);
+        if (subDirectoryList.length() > 0 && dirList.length() > 0) {
+          dirList.append(",");
+        }
+        dirList.append(subDirectoryList);
+      } else {
+        bContainsFiles = true;
+      }
+    }
+
+    if (bContainsFiles) {
+      if (dirList.length() > 0) {
+        dirList.append(",");
+      }
+      dirList.append(fileStatus.getPath().toUri().getPath());
+    }
+    return dirList.toString();
+  }
+
   /** Returns the input path established by a call to {@link #parseArguments(String[])}.
    *  The source of the path may be an input option added using {@link #addInputOption()}
    *  or it may be the value of the {@code mapred.input.dir} configuration
@@ -634,7 +663,6 @@ public abstract class AbstractJob extend
       // you can't instantiate it
       //ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
       AnalyzerUtils.createAnalyzer(analyzerClass);
-
     }
     return analyzerClass;
   }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java Sun Jun 23 18:15:56 2013
@@ -31,12 +31,17 @@ import com.google.common.base.Charsets;
  * defines lines.
  * 
  * This class will uncompress files that end in .zip or .gz accordingly, too.
+ * 
+ * In case that you pass the class a stream of compressed bytes, simply pass the
+ * original file name as well in the constructor and the system will decompress the bytes.
+ * 
  */
 public final class FileLineIterable implements Iterable<String> {
 
   private final InputStream is;
   private final Charset encoding;
   private final boolean skipFirstLine;
+  private final String origFilename;
   
   /** Creates a  over a given file, assuming a UTF-8 encoding. */
   public FileLineIterable(File file) throws IOException {
@@ -65,12 +70,21 @@ public final class FileLineIterable impl
     this.is = is;
     this.encoding = encoding;
     this.skipFirstLine = skipFirstLine;
+    this.origFilename = "";
+  }
+
+  public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine, String filename) {    
+    this.is = is;
+    this.encoding = encoding;
+    this.skipFirstLine = skipFirstLine;
+    this.origFilename = filename;
   }
   
+  
   @Override
   public Iterator<String> iterator() {
     try {
-      return new FileLineIterator(is, encoding, skipFirstLine);
+      return new FileLineIterator(is, encoding, skipFirstLine, this.origFilename);
     } catch (IOException ioe) {
       throw new IllegalStateException(ioe);
     }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java Sun Jun 23 18:15:56 2013
@@ -31,6 +31,7 @@ import java.util.zip.ZipInputStream;
 import com.google.common.base.Charsets;
 import com.google.common.collect.AbstractIterator;
 import com.google.common.io.Closeables;
+import com.google.common.io.Files;
 import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Iterates over the lines of a text file. This assumes the text file's lines are delimited in a manner
  * consistent with how {@link BufferedReader} defines lines.
- * 
+ * <p/>
  * This class will uncompress files that end in .zip or .gz accordingly, too.
  */
 public final class FileLineIterator extends AbstractIterator<String> implements SkippingIterator<String>, Closeable {
@@ -47,11 +48,10 @@ public final class FileLineIterator exte
 
   private static final Logger log = LoggerFactory.getLogger(FileLineIterator.class);
 
-      /**
-       * Creates a  over a given file, assuming a UTF-8 encoding.
-       *
-       * @throws java.io.FileNotFoundException
-       *           if the file does not exist
+  /**
+   * Creates a  over a given file, assuming a UTF-8 encoding.
+   *
+   * @throws java.io.FileNotFoundException if the file does not exist
        * @throws IOException
        *           if the file cannot be read
        */
@@ -59,52 +59,66 @@ public final class FileLineIterator exte
   public FileLineIterator(File file) throws IOException {
     this(file, Charsets.UTF_8, false);
   }
-  
+
   /**
    * Creates a  over a given file, assuming a UTF-8 encoding.
-   * 
-   * @throws java.io.FileNotFoundException
-   *           if the file does not exist
-   * @throws IOException
-   *           if the file cannot be read
+   *
+   * @throws java.io.FileNotFoundException if the file does not exist
+   * @throws IOException                   if the file cannot be read
    */
   public FileLineIterator(File file, boolean skipFirstLine) throws IOException {
     this(file, Charsets.UTF_8, skipFirstLine);
   }
-  
+
   /**
    * Creates a  over a given file, using the given encoding.
-   * 
-   * @throws java.io.FileNotFoundException
-   *           if the file does not exist
-   * @throws IOException
-   *           if the file cannot be read
+   *
+   * @throws java.io.FileNotFoundException if the file does not exist
+   * @throws IOException                   if the file cannot be read
    */
   public FileLineIterator(File file, Charset encoding, boolean skipFirstLine) throws IOException {
     this(getFileInputStream(file), encoding, skipFirstLine);
   }
-  
+
   public FileLineIterator(InputStream is) throws IOException {
     this(is, Charsets.UTF_8, false);
   }
-  
+
   public FileLineIterator(InputStream is, boolean skipFirstLine) throws IOException {
     this(is, Charsets.UTF_8, skipFirstLine);
   }
-  
+
   public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine) throws IOException {
     reader = new BufferedReader(new InputStreamReader(is, encoding));
     if (skipFirstLine) {
       reader.readLine();
     }
   }
-  
+
+  public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine, String filename)
+    throws IOException {
+    InputStream compressedInputStream;
+
+    if ("gz".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+      compressedInputStream = new GZIPInputStream(is);
+    } else if ("zip".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+      compressedInputStream = new ZipInputStream(is);
+    } else {
+      compressedInputStream = is;
+    }
+
+    reader = new BufferedReader(new InputStreamReader(compressedInputStream, encoding));
+    if (skipFirstLine) {
+      reader.readLine();
+    }
+  }
+
   static InputStream getFileInputStream(File file) throws IOException {
     InputStream is = new FileInputStream(file);
     String name = file.getName();
-    if (name.endsWith(".gz")) {
+    if ("gz".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
       return new GZIPInputStream(is);
-    } else if (name.endsWith(".zip")) {
+    } else if ("zip".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
       return new ZipInputStream(is);
     } else {
       return is;
@@ -127,7 +141,7 @@ public final class FileLineIterator exte
     return line == null ? endOfData() : line;
   }
 
-  
+
   @Override
   public void skip(int n) {
     try {
@@ -144,11 +158,11 @@ public final class FileLineIterator exte
       }
     }
   }
-  
+
   @Override
   public void close() throws IOException {
     endOfData();
     Closeables.close(reader, true);
   }
-  
+
 }

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ * 
+ * Used in combining a large number of text files into one text input reader
+ * along with the WholeFileRecordReader class.
+ * 
+ */
+public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> {
+
+  @Override
+  public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
+                                                                      TaskAttemptContext taskAttemptContext)
+      throws IOException {
+    return new CombineFileRecordReader<IntWritable, BytesWritable>((CombineFileSplit) inputSplit,
+      taskAttemptContext, WholeFileRecordReader.class);
+  }
+}

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Sun Jun 23 18:15:56 2013
@@ -17,17 +17,24 @@
 
 package org.apache.mahout.text;
 
-import java.lang.reflect.Constructor;
+import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.apache.mahout.utils.io.ChunkedWriter;
@@ -42,38 +49,50 @@ import org.apache.mahout.utils.io.Chunke
 public class SequenceFilesFromDirectory extends AbstractJob {
 
   private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
-  
+
   private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
-  private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass","filter"};
+  private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
   private static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
   private static final String[] CHARSET_OPTION = {"charset", "c"};
 
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new SequenceFilesFromDirectory(), args);
   }
-  
+
   /*
-   * callback main after processing hadoop parameters
-   */
+  * callback main after processing MapReduce parameters
+  */
   @Override
   public int run(String[] args) throws Exception {
-    addOptions();    
-    
+    addOptions();
+    addOption(DefaultOptionCreator.methodOption().create());
+    addOption(DefaultOptionCreator.overwriteOption().create());
+
     if (parseArguments(args) == null) {
       return -1;
     }
-   
+
     Map<String, String> options = parseOptions();
-    Path input = getInputPath();
     Path output = getOutputPath();
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
-      Configuration conf = new Configuration();
-      HadoopUtil.delete(conf, output);
+      HadoopUtil.delete(getConf(), output);
     }
-    String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
 
+    if (getOption(DefaultOptionCreator.METHOD_OPTION,
+      DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+      runSequential(getConf(), getInputPath(), output, options);
+    } else {
+      runMapReduce(getInputPath(), output);
+    }
+
+    return 0;
+  }
+
+  private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options)
+    throws IOException, InterruptedException, NoSuchMethodException {
+    // Running sequentially
     Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
-    Configuration conf = getConf();
+    String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
     FileSystem fs = FileSystem.get(input.toUri(), conf);
     ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
 
@@ -83,16 +102,9 @@ public class SequenceFilesFromDirectory 
       if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
         pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
       } else {
-        Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass =
-            Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
-        Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
-            pathFilterClass.getConstructor(Configuration.class,
-                                           String.class,
-                                           Map.class,
-                                           ChunkedWriter.class,
-                                           Charset.class,
-                                           FileSystem.class);
-        pathFilter = constructor.newInstance(conf, keyPrefix, options, writer, charset, fs);
+        pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
+          new Class[]{Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
+          new Object[]{conf, keyPrefix, options, writer, charset, fs});
       }
       fs.listStatus(input, pathFilter);
     } finally {
@@ -101,25 +113,69 @@ public class SequenceFilesFromDirectory 
     return 0;
   }
 
+  private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException {
+
+    int chunkSizeInMB = 64;
+    if (hasOption(CHUNK_SIZE_OPTION[0])) {
+      chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+    }
+
+    String keyPrefix = null;
+    if (hasOption(KEY_PREFIX_OPTION[0])) {
+      keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+    }
+
+    // Prepare Job for submission.
+    Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
+      SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
+      SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
+
+    Configuration jobConfig = job.getConfiguration();
+    jobConfig.set("keyPrefix", keyPrefix);
+    FileSystem fs = FileSystem.get(jobConfig);
+    FileStatus fsFileStatus = fs.getFileStatus(input);
+    String inputDirList = buildDirList(fs, fsFileStatus);
+    jobConfig.set("baseinputpath", input.toString());
+
+    long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+
+    // set the max split locations, otherwise we get nasty debug stuff
+    jobConfig.set("mapreduce.job.max.split.locations", "1000000");
+
+    FileInputFormat.setInputPaths(job, inputDirList);
+    // need to set this to a multiple of the block size, or no split happens
+    FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+    FileOutputFormat.setCompressOutput(job, true);
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
+    return 0;
+  }
+
   /**
    * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job.
    * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available.
-   * */
+   */
   protected void addOptions() {
     addInputOption();
     addOutputOption();
     addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(DefaultOptionCreator.methodOption().create());
     addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
     addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
-        "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+      "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
     addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
     addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
-        "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+      "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
   }
 
   /**
    * Override this method in order to parse your additional options from the command line. Do not forget to call
    * super() otherwise standard options (input/output dirs etc) will not be available.
+   *
+   * @return Map of options
    */
   protected Map<String, String> parseOptions() {
     Map<String, String> options = Maps.newHashMap();

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+import java.io.IOException;
+
+/**
+ * Map class for SequenceFilesFromDirectory MR job
+ */
+public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+  private String keyPrefix;
+  private Text fileValue = new Text();
+
+  @Override
+  protected void setup(Context context) throws IOException, InterruptedException {
+    super.setup(context);
+    this.keyPrefix = context.getConfiguration().get("keyPrefix", "");
+  }
+
+  public void map(IntWritable key, BytesWritable value, Context context)
+    throws IOException, InterruptedException {
+
+    Configuration configuration = context.getConfiguration();
+    Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+    String relativeFilePath = SequenceFilesFromMailArchivesMapper.calcRelativeFilePath(configuration, filePath);
+
+    String filename = this.keyPrefix.length() > 0 ?
+      this.keyPrefix + Path.SEPARATOR + relativeFilePath :
+      Path.SEPARATOR + relativeFilePath;
+
+    fileValue.set(value.getBytes(), 0, value.getBytes().length);
+    context.write(new Text(filename), fileValue);
+  }
+}

Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java Sun Jun 23 18:15:56 2013
@@ -20,17 +20,23 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.charset.Charset;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 import org.apache.commons.cli2.builder.ArgumentBuilder;
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
@@ -48,21 +54,16 @@ import org.slf4j.LoggerFactory;
  */
 public final class SequenceFilesFromMailArchives extends AbstractJob {
 
-  private static final Logger log = LoggerFactory.getLogger(
-      SequenceFilesFromMailArchives.class);
+  private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
 
   public void createSequenceFiles(MailOptions options) throws IOException {
-    ChunkedWriter writer = new ChunkedWriter(
-        getConf(), options.getChunkSize(), new Path(options.getOutputDir()));
-    MailProcessor processor = new MailProcessor(
-        options, options.getPrefix(), writer);
+    ChunkedWriter writer = new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()));
+    MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
     try {
       if (options.getInput().isDirectory()) {
-        PrefixAdditionFilter filter = new PrefixAdditionFilter(
-            processor, writer);
+        PrefixAdditionFilter filter = new PrefixAdditionFilter(processor, writer);
         options.getInput().listFiles(filter);
-        log.info("Parsed {} messages from {}", filter.getMessageCount(),
-            options.getInput().getAbsolutePath());
+        log.info("Parsed {} messages from {}", filter.getMessageCount(), options.getInput().getAbsolutePath());
       } else {
         long start = System.currentTimeMillis();
         long cnt = processor.parseMboxLineByLine(options.getInput());
@@ -94,12 +95,11 @@ public final class SequenceFilesFromMail
       if (current.isDirectory()) {
         log.info("At {}", current.getAbsolutePath());
         PrefixAdditionFilter nested = new PrefixAdditionFilter(
-            new MailProcessor(processor.getOptions(), processor.getPrefix()
-                + File.separator + current.getName(), writer), writer);
+          new MailProcessor(processor.getOptions(), processor.getPrefix()
+            + File.separator + current.getName(), writer), writer);
         current.listFiles(nested);
         long dirCount = nested.getMessageCount();
-        log.info("Parsed {} messages from directory {}", dirCount,
-            current.getAbsolutePath());
+        log.info("Parsed {} messages from directory {}", dirCount, current.getAbsolutePath());
         messageCount += dirCount;
       } else {
         try {
@@ -118,69 +118,69 @@ public final class SequenceFilesFromMail
 
   @Override
   public int run(String[] args) throws Exception {
-    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
-    ArgumentBuilder abuilder = new ArgumentBuilder();
-    //GroupBuilder gbuilder = new GroupBuilder();
+    DefaultOptionBuilder optionBuilder = new DefaultOptionBuilder();
+    ArgumentBuilder argumentBuilder = new ArgumentBuilder();
 
     addInputOption();
     addOutputOption();
+    addOption(DefaultOptionCreator.methodOption().create());
 
-    addOption(obuilder.withLongName("chunkSize").withArgument(
-        abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create())
-        .withDescription("The chunkSize in MegaBytes. Defaults to 64")
-        .withShortName("chunk").create());
-
-    addOption(obuilder.withLongName("keyPrefix").withArgument(
-        abuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create())
-        .withDescription("The prefix to be prepended to the key")
-        .withShortName("prefix").create());
-    addOption(obuilder.withLongName("charset")
-        .withRequired(true).withArgument(abuilder.withName("charset")
-            .withMinimum(1).withMaximum(1).create()).withDescription(
-            "The name of the character encoding of the input files")
-        .withShortName("c").create());
-    addOption(obuilder.withLongName("subject")
-        .withRequired(false).withDescription(
-            "Include the Mail subject as part of the text.  Default is false")
-        .withShortName("s").create());
-    addOption(obuilder.withLongName("to").withRequired(false)
-        .withDescription("Include the to field in the text.  Default is false")
-        .withShortName("to").create());
-    addOption(obuilder.withLongName("from").withRequired(false).withDescription(
-        "Include the from field in the text.  Default is false")
-        .withShortName("from").create());
-    addOption(obuilder.withLongName("references")
-        .withRequired(false).withDescription(
-            "Include the references field in the text.  Default is false")
-        .withShortName("refs").create());
-    addOption(obuilder.withLongName("body").withRequired(false)
-        .withDescription("Include the body in the output.  Default is false")
-        .withShortName("b").create());
-    addOption(obuilder.withLongName("stripQuoted")
-        .withRequired(false).withDescription(
-            "Strip (remove) quoted email text in the body.  Default is false")
-        .withShortName("q").create());
+    addOption(optionBuilder.withLongName("chunkSize").withArgument(
+      argumentBuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create())
+      .withDescription("The chunkSize in MegaBytes. Defaults to 64")
+      .withShortName("chunk").create());
+
+    addOption(optionBuilder.withLongName("keyPrefix").withArgument(
+      argumentBuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create())
+      .withDescription("The prefix to be prepended to the key")
+      .withShortName("prefix").create());
+    addOption(optionBuilder.withLongName("charset")
+      .withRequired(true).withArgument(argumentBuilder.withName("charset")
+        .withMinimum(1).withMaximum(1).create()).withDescription(
+        "The name of the character encoding of the input files")
+      .withShortName("c").create());
+    addOption(optionBuilder.withLongName("subject")
+      .withRequired(false).withDescription(
+        "Include the Mail subject as part of the text.  Default is false")
+      .withShortName("s").create());
+    addOption(optionBuilder.withLongName("to").withRequired(false)
+      .withDescription("Include the to field in the text.  Default is false")
+      .withShortName("to").create());
+    addOption(optionBuilder.withLongName("from").withRequired(false).withDescription(
+      "Include the from field in the text.  Default is false")
+      .withShortName("from").create());
+    addOption(optionBuilder.withLongName("references")
+      .withRequired(false).withDescription(
+        "Include the references field in the text.  Default is false")
+      .withShortName("refs").create());
+    addOption(optionBuilder.withLongName("body").withRequired(false)
+      .withDescription("Include the body in the output.  Default is false")
+      .withShortName("b").create());
+    addOption(optionBuilder.withLongName("stripQuoted")
+      .withRequired(false).withDescription(
+        "Strip (remove) quoted email text in the body.  Default is false")
+      .withShortName("q").create());
     addOption(
-        obuilder.withLongName("quotedRegex")
-            .withRequired(false).withArgument(abuilder.withName("regex")
-                .withMinimum(1).withMaximum(1).create()).withDescription(
-                "Specify the regex that identifies quoted text.  "
-                    + "Default is to look for > or | at the beginning of the line.")
-            .withShortName("q").create());
+      optionBuilder.withLongName("quotedRegex")
+        .withRequired(false).withArgument(argumentBuilder.withName("regex")
+        .withMinimum(1).withMaximum(1).create()).withDescription(
+        "Specify the regex that identifies quoted text.  "
+          + "Default is to look for > or | at the beginning of the line.")
+        .withShortName("q").create());
     addOption(
-        obuilder.withLongName("separator")
-            .withRequired(false).withArgument(abuilder.withName("separator")
-                .withMinimum(1).withMaximum(1).create()).withDescription(
-                "The separator to use between metadata items (to, from, etc.).  Default is \\n")
-            .withShortName("sep").create());
+      optionBuilder.withLongName("separator")
+        .withRequired(false).withArgument(argumentBuilder.withName("separator")
+        .withMinimum(1).withMaximum(1).create()).withDescription(
+        "The separator to use between metadata items (to, from, etc.).  Default is \\n")
+        .withShortName("sep").create());
 
     addOption(
-        obuilder.withLongName("bodySeparator")
-            .withRequired(false).withArgument(abuilder.withName("bodySeparator")
-                .withMinimum(1).withMaximum(1).create()).withDescription(
-                "The separator to use between lines in the body.  Default is \\n.  "
-                    + "Useful to change if you wish to have the message be on one line")
-            .withShortName("bodySep").create());
+      optionBuilder.withLongName("bodySeparator")
+        .withRequired(false).withArgument(argumentBuilder.withName("bodySeparator")
+        .withMinimum(1).withMaximum(1).create()).withDescription(
+        "The separator to use between lines in the body.  Default is \\n.  "
+          + "Useful to change if you wish to have the message be on one line")
+        .withShortName("bodySep").create());
     addOption(DefaultOptionCreator.helpOption());
     Map<String, List<String>> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
@@ -209,10 +209,9 @@ public final class SequenceFilesFromMail
 
     List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
     // patternOrder is used downstream so that we can know what order the text
-    // is in instead
-    // of encoding it in the string, which
+    // is in instead of encoding it in the string, which
     // would require more processing later to remove it pre feature selection.
-    Map<String,Integer> patternOrder = new HashMap<String,Integer>();
+    Map<String, Integer> patternOrder = Maps.newHashMap();
     int order = 0;
     if (hasOption("from")) {
       patterns.add(MailProcessor.FROM_PREFIX);
@@ -245,10 +244,101 @@ public final class SequenceFilesFromMail
     if (hasOption("quotedRegex")) {
       options.setQuotedTextPattern(Pattern.compile(getOption("quotedRegex")));
     }
+
+    if (getOption(DefaultOptionCreator.METHOD_OPTION,
+      DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+      runSequential(options);
+    } else {
+      runMapReduce(getInputPath(), getOutputPath());
+    }
+
+    return 0;
+  }
+
+  private int runSequential(MailOptions options)
+    throws IOException, InterruptedException, NoSuchMethodException {
+
     long start = System.currentTimeMillis();
     createSequenceFiles(options);
     long finish = System.currentTimeMillis();
     log.info("Conversion took {}ms", finish - start);
+
+    return 0;
+  }
+
+  private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+
+    Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class,
+      Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives");
+
+    Configuration jobConfig = job.getConfiguration();
+
+    if (hasOption("keyPrefix")) {
+      jobConfig.set("prefix", getOption("keyPrefix"));
+    }
+
+    int chunkSize = 0;
+    if (hasOption("chunkSize")) {
+      chunkSize = Integer.parseInt(getOption("chunkSize"));
+      jobConfig.set("chunkSize", String.valueOf(chunkSize));
+    }
+
+    Charset charset;
+    if (hasOption("charset")) {
+      charset = Charset.forName(getOption("charset"));
+      jobConfig.set("charset", charset.displayName());
+    }
+
+    if (hasOption("from")) {
+      jobConfig.set("fromOpt", "true");
+    }
+
+    if (hasOption("to")) {
+      jobConfig.set("toOpt", "true");
+    }
+
+    if (hasOption("references")) {
+      jobConfig.set("refsOpt", "true");
+    }
+
+    if (hasOption("subject")) {
+      jobConfig.set("subjectOpt", "true");
+    }
+
+    if (hasOption("quotedRegex")) {
+      jobConfig.set("quotedRegex", Pattern.compile(getOption("quotedRegex")).toString());
+    }
+
+    if (hasOption("separatorOpt")) {
+      jobConfig.set("separatorOpt", getOption("separatorOpt"));
+    } else {
+      jobConfig.set("separatorOpt", "\n");
+    }
+
+    if (hasOption("body")) {
+      jobConfig.set("bodyOpt", "true");
+    } else {
+      jobConfig.set("bodyOpt", "false");
+    }
+
+    FileSystem fs = FileSystem.get(jobConfig);
+    FileStatus fsFileStatus = fs.getFileStatus(inputPath);
+
+    jobConfig.set("baseinputpath", inputPath.toString());
+    String inputDirList = buildDirList(fs, fsFileStatus);
+    FileInputFormat.setInputPaths(job, inputDirList);
+
+    long chunkSizeInBytes = chunkSize * 1024 * 1024;
+    // need to set this to a multiple of the block size, or no split happens
+    FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+
+    // set the max split locations, otherwise we get nasty debug stuff
+    jobConfig.set("mapreduce.job.max.split.locations", "1000000");
+
+    boolean succeeded = job.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
     return 0;
   }
 }

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.email.MailOptions;
+import org.apache.mahout.utils.email.MailProcessor;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ * 
+ * Map Class for the SequenceFilesFromMailArchives job
+ * 
+ */
+public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+  
+  private Text outKey = new Text();
+  private Text outValue = new Text();
+  
+  private static final Pattern MESSAGE_START = Pattern.compile(
+      "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
+  private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
+      "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
+
+  private MailOptions options;
+  
+  @Override
+  public void setup(Context context) throws IOException, InterruptedException {
+
+    Configuration conf = context.getConfiguration();
+    // absorb all of the options into the MailOptions object
+    
+    this.options = new MailOptions();
+
+    options.setPrefix(conf.get("prefix", ""));
+    
+    if (!conf.get("chunkSize", "").equals("")) {
+      options.setChunkSize(conf.getInt("chunkSize", 64));
+    }
+    
+    if (!conf.get("charset", "").equals("")) {
+      Charset charset = Charset.forName(conf.get("charset", "UTF-8"));
+      options.setCharset(charset);
+    } else {
+      Charset charset = Charset.forName("UTF-8");
+      options.setCharset(charset);
+    }
+    
+    List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
+    // patternOrder is used downstream so that we can know what order the
+    // text is in instead
+    // of encoding it in the string, which
+    // would require more processing later to remove it pre feature
+    // selection.
+    Map<String,Integer> patternOrder = Maps.newHashMap();
+    int order = 0;
+    
+    if (!conf.get("fromOpt", "").equals("")) {
+      patterns.add(MailProcessor.FROM_PREFIX);
+      patternOrder.put(MailOptions.FROM, order++);
+    }
+
+    if (!conf.get("toOpt", "").equals("")) {
+      patterns.add(MailProcessor.TO_PREFIX);
+      patternOrder.put(MailOptions.TO, order++);
+    }
+
+    if (!conf.get("refsOpt", "").equals("")) {
+      patterns.add(MailProcessor.REFS_PREFIX);
+      patternOrder.put(MailOptions.REFS, order++);
+    }
+    
+    if (!conf.get("subjectOpt", "").equals("")) {
+      patterns.add(MailProcessor.SUBJECT_PREFIX);
+      patternOrder.put(MailOptions.SUBJECT, order++);
+    }
+    
+    options.setStripQuotedText(conf.getBoolean("quotedOpt", false));
+    
+    options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
+    options.setPatternOrder(patternOrder);
+    
+    options.setIncludeBody(conf.getBoolean("bodyOpt", false));
+    
+    options.setSeparator("\n");
+    if (!conf.get("separatorOpt", "").equals("")) {
+      options.setSeparator(conf.get("separatorOpt", ""));
+    }
+    if (!conf.get("bodySeparatorOpt", "").equals("")) {
+      options.setBodySeparator(conf.get("bodySeparatorOpt", ""));
+    }
+    if (!conf.get("quotedRegexOpt", "").equals("")) {
+      options.setQuotedTextPattern(Pattern.compile(conf.get("quotedRegexOpt", "")));
+    }
+
+  }
+  
+  public long parseMboxLineByLine(String filename, InputStream mboxInputStream, Context context)
+    throws IOException, InterruptedException {
+    long messageCount = 0;
+    try {
+      StringBuilder contents = new StringBuilder();
+      StringBuilder body = new StringBuilder();
+      Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
+      Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
+      String[] patternResults = new String[options.getPatternsToMatch().length];
+      Matcher[] matchers = new Matcher[options.getPatternsToMatch().length];
+      for (int i = 0; i < matchers.length; i++) {
+        matchers[i] = options.getPatternsToMatch()[i].matcher("");
+      }
+      
+      String messageId = null;
+      boolean inBody = false;
+      Pattern quotedTextPattern = options.getQuotedTextPattern();
+      
+      for (String nextLine : new FileLineIterable(mboxInputStream, options.getCharset(), false, filename)) {
+        if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
+          for (int i = 0; i < matchers.length; i++) {
+            Matcher matcher = matchers[i];
+            matcher.reset(nextLine);
+            if (matcher.matches()) {
+              patternResults[i] = matcher.group(1);
+            }
+          }
+
+          // only start appending body content after we've seen a message ID
+          if (messageId != null) {
+            // first, see if we hit the end of the message
+            messageBoundaryMatcher.reset(nextLine);
+            if (messageBoundaryMatcher.matches()) {
+              // done parsing this message ... write it out
+              String key = generateKey(filename, options.getPrefix(), messageId);
+              // if this ordering changes, then also change
+              // FromEmailToDictionaryMapper
+              writeContent(options.getSeparator(), contents, body, patternResults);
+
+              this.outKey.set(key);
+              this.outValue.set(contents.toString());
+              context.write(this.outKey, this.outValue);
+              contents.setLength(0); // reset the buffer
+              body.setLength(0);
+              messageId = null;
+              inBody = false;
+            } else {
+              if (inBody && options.isIncludeBody()) {
+                if (!nextLine.isEmpty()) {
+                  body.append(nextLine).append(options.getBodySeparator());
+                }
+              } else {
+                // first empty line we see after reading the message Id
+                // indicates that we are in the body ...
+                inBody = nextLine.isEmpty();
+              }
+            }
+          } else {
+            if (nextLine.length() > 14) {
+              messageIdMatcher.reset(nextLine);
+              if (messageIdMatcher.matches()) {
+                messageId = messageIdMatcher.group(1);
+                ++messageCount;
+              }
+            }
+          }
+        }
+      }
+      // write the last message in the file if available
+      if (messageId != null) {
+        String key = generateKey(filename, options.getPrefix(), messageId);
+        writeContent(options.getSeparator(), contents, body, patternResults);
+        
+        this.outKey.set(key);
+        this.outValue.set(contents.toString());
+        context.write(this.outKey, this.outValue);
+        contents.setLength(0); // reset the buffer
+      }
+    } catch (FileNotFoundException ignored) {
+
+    }
+    // TODO: report exceptions and continue;
+    return messageCount;
+  }
+  
+  protected static String generateKey(String mboxFilename, String prefix, String messageId) {
+    return prefix + File.separator + mboxFilename + File.separator + messageId;
+  }
+  
+  private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
+    for (String match : matches) {
+      if (match != null) {
+        contents.append(match).append(separator);
+      } else {
+        contents.append("").append(separator);
+      }
+    }
+    contents.append(body);
+  }
+  
+  public static String calcRelativeFilePath(Configuration conf, Path filePath) throws IOException {
+    FileSystem fs = filePath.getFileSystem(conf);
+    FileStatus fst = fs.getFileStatus(filePath);
+    String currentPath = fst.getPath().toString().replaceFirst("file:", "");
+
+    String basePath = conf.get("baseinputpath");
+    if (!basePath.endsWith("/")) {
+      basePath += "/";
+    }
+    basePath = basePath.replaceFirst("file:", "");
+    String[] parts = currentPath.split(basePath);
+
+    String hdfsStuffRemoved = currentPath; // default value
+    if (parts.length == 2) {
+      hdfsStuffRemoved = parts[1];
+    } else if (parts.length == 1) {
+      hdfsStuffRemoved = parts[0];
+    }
+    return hdfsStuffRemoved;
+  }
+
+  public void map(IntWritable key, BytesWritable value, Context context)
+    throws IOException, InterruptedException {
+    Configuration configuration = context.getConfiguration();
+    Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+    String relativeFilePath = calcRelativeFilePath(configuration, filePath);
+    ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
+    parseMboxLineByLine(relativeFilePath, is, context);
+  }
+}

Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * RecordReader used with the MultipleTextFileInputFormat class to read full files as
+ * k/v pairs and groups of files as single input splits.
+ */
+public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> {
+
+  private FileSplit fileSplit;
+  private boolean processed = false;
+  private Configuration configuration;
+  private BytesWritable value = new BytesWritable();
+  private IntWritable index;
+
+  public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
+    throws IOException {
+    this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
+       fileSplit.getLength(idx), fileSplit.getLocations());
+    this.configuration = taskAttemptContext.getConfiguration();
+    this.index = new IntWritable(idx);
+  }
+
+  @Override
+  public IntWritable getCurrentKey() {
+    return index;
+  }
+
+  @Override
+  public BytesWritable getCurrentValue() {
+    return value;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return processed ? 1.0f : 0.0f;
+  }
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+    throws IOException, InterruptedException {
+
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    if (!processed) {
+      byte[] contents = new byte[(int) fileSplit.getLength()];
+      Path file = fileSplit.getPath();
+      FileSystem fs = file.getFileSystem(this.configuration);
+      FSDataInputStream in = null;
+      try {
+        if (!fs.isFile(file)) {
+          return false;
+        }
+        in = fs.open(file);
+        IOUtils.readFully(in, contents, 0, contents.length);
+        value.setCapacity(contents.length);
+        value.set(contents, 0, contents.length);
+      } finally {
+        Closeables.close(in, false);
+      }
+      processed = true;
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+  }
+}
\ No newline at end of file

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java Sun Jun 23 18:15:56 2013
@@ -22,9 +22,10 @@ import java.util.zip.GZIPOutputStream;
 
 import com.google.common.io.Closeables;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
 import org.apache.mahout.utils.MahoutTestCase;
@@ -36,11 +37,10 @@ import org.junit.Test;
  * Test case for the SequenceFilesFromMailArchives command-line application.
  */
 public final class SequenceFilesFromMailArchivesTest extends MahoutTestCase {
-  
+
   // TODO: Negative tests
 
-  private File inputDir = null;
-  private File outputDir = null;
+  private File inputDir;
 
   /**
    * Create the input and output directories needed for testing
@@ -51,8 +51,7 @@ public final class SequenceFilesFromMail
   public void setUp() throws Exception {
     super.setUp();
     inputDir = getTestTempDir("mail-archives-in");
-    outputDir = getTestTempDir("mail-archives-out");
-    
+
     // write test mail messages to a gzipped file in a nested directory
     File subDir = new File(inputDir, "subdir");
     subDir.mkdir();
@@ -64,58 +63,130 @@ public final class SequenceFilesFromMail
       gzOut.finish();
     } finally {
       Closeables.close(gzOut, false);
+    }
+    
+    File subDir2 = new File(subDir, "subsubdir");
+    subDir2.mkdir();
+    File gzFile2 = new File(subDir2, "mail-messages-2.gz");
+    try {
+      gzOut = new GZIPOutputStream(new FileOutputStream(gzFile2));
+      gzOut.write(testMailMessages.getBytes("UTF-8"));
+      gzOut.finish();
+    } finally {
+      Closeables.close(gzOut, false);
     }    
   }
 
-  /**
-   * Test the main method of the SequenceFilesFromMailArchives
-   * command-line application.
-   */
   @Test
-  public void testMain() throws Exception {
+  public void testSequential() throws Exception {
+
+    File outputDir = this.getTestTempDir("mail-archives-out");
+
     String[] args = {
-      "--input", inputDir.getAbsolutePath(),  
+      "--input", inputDir.getAbsolutePath(),
       "--output", outputDir.getAbsolutePath(),
       "--charset", "UTF-8",
       "--keyPrefix", "TEST",
-       "--body", "--subject", "--separator", ""
+      "--method", "sequential",
+      "--body", "--subject", "--separator", ""
     };
-    
+
     // run the application's main method
     SequenceFilesFromMailArchives.main(args);
-    
-    // app should create a single SequenceFile named "chunk-0"
-    // in the output dir
+
+    // app should create a single SequenceFile named "chunk-0" in the output dir
     File expectedChunkFile = new File(outputDir, "chunk-0");
     String expectedChunkPath = expectedChunkFile.getAbsolutePath();
-    Assert.assertTrue("Expected chunk file "+expectedChunkPath+" not found!", expectedChunkFile.isFile());
-
+    Assert.assertTrue("Expected chunk file " + expectedChunkPath + " not found!", expectedChunkFile.isFile());
 
     Configuration conf = new Configuration();
-    SequenceFileIterator<Text,Text> iterator =
-        new SequenceFileIterator<Text,Text>(new Path(expectedChunkPath), true, conf);
-
+    SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(new Path(expectedChunkPath), true, conf);
     Assert.assertTrue("First key/value pair not found!", iterator.hasNext());
-    Pair<Text,Text> record = iterator.next();
+    Pair<Text, Text> record = iterator.next();
 
     File parentFile = new File(new File(new File("TEST"), "subdir"), "mail-messages.gz");
     Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), record.getFirst().toString());
-    Assert.assertEquals(testVars[0][1]+testVars[0][2], record.getSecond().toString());
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
 
     Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
     record = iterator.next();
     Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), record.getFirst().toString());
-    Assert.assertEquals(testVars[1][1]+testVars[1][2], record.getSecond().toString());
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
+
+    record = iterator.next();
+    File parentFileSubSubDir = new File(new File(new File(new File("TEST"), "subdir"), "subsubdir"), "mail-messages-2.gz");
+    Assert.assertEquals(new File(parentFileSubSubDir, testVars[0][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
+
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+    record = iterator.next();
+    Assert.assertEquals(new File(parentFileSubSubDir, testVars[1][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
 
     Assert.assertFalse("Only two key/value pairs expected!", iterator.hasNext());
   }
 
-  
-  // Messages extracted and anonymized from the ASF mail archives
+  @Test
+  public void testMapReduce() throws Exception {
+
+    Path tmpDir = this.getTestTempDirPath();
+    Path mrOutputDir = new Path(tmpDir, "mail-archives-out-mr");
+    Configuration configuration = new Configuration();
+    FileSystem fs = FileSystem.get(configuration);
+
+    File expectedInputFile = new File(inputDir.toString());
+
+    String[] args = {
+      "--input", expectedInputFile.getAbsolutePath(),
+      "--output", mrOutputDir.toString(),
+      "--charset", "UTF-8",
+      "--keyPrefix", "TEST",
+      "--method", "mapreduce",
+      "--body", "--subject", "--separator", ""
+    };
+
+    // run the application's main method
+    SequenceFilesFromMailArchives.main(args);
+
+    // app should create a single SequenceFile named "chunk-0" in the output dir
+    FileStatus[] fileStatuses = fs.listStatus(mrOutputDir.suffix("/part-m-00000"));
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+    SequenceFileIterator<Text, Text> iterator =
+      new SequenceFileIterator<Text, Text>(mrOutputDir.suffix("/part-m-00000"), true, configuration);
+
+    Assert.assertTrue("First key/value pair not found!", iterator.hasNext());
+    Pair<Text, Text> record = iterator.next();
+
+    File parentFileSubSubDir = new File(new File(new File(new File("TEST"), "subdir"), "subsubdir"), "mail-messages-2.gz");
+
+    Assert.assertEquals(new File(parentFileSubSubDir, testVars[0][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+    record = iterator.next();
+    Assert.assertEquals(new File(parentFileSubSubDir, testVars[1][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
+
+    // test other file
+    File parentFile = new File(new File(new File("TEST"), "subdir"), "mail-messages.gz");
+    record = iterator.next();
+    Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
+    Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+    record = iterator.next();
+    Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), record.getFirst().toString());
+    Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
+    Assert.assertFalse("Only four key/value pairs expected!", iterator.hasNext());
+  }
+
+  // Messages extracted and made anonymous from the ASF mail archives
   private static final String[][] testVars = {
     new String[] {
       "user@example.com",
-      "Ant task for JDK1.1 collections build option", 
+      "Ant task for JDK1.1 collections build option",
       "\nThis is just a test message\n--\nTesty McTester\n"
     },
     new String[] {
@@ -124,38 +195,38 @@ public final class SequenceFilesFromMail
       "\nHi all,\nThis is another test message.\nRegards,\nAnother Test\n"
     }
   };
-  
+
   private static final String testMailMessages =
     "From user@example.com  Mon Jul 24 19:13:53 2000\n"
-    + "Return-Path: <us...@example.com>\n"
-    + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
-    + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
-    + "Received: (qmail 49267 invoked from network); 24 Jul 2000 19:13:53 -0000\n"
-    + "Message-ID: <"+testVars[0][0]+">\n"
-    + "From: \"Testy McTester\" <us...@example.com>\n"
-    + "To: <an...@jakarta.apache.org>\n"
-    + "Subject: "+testVars[0][1]+ '\n' 
-    + "Date: Mon, 24 Jul 2000 12:24:56 -0700\n"
-    + "MIME-Version: 1.0\n"
-    + "Content-Type: text/plain;\n"
-    + "  charset=\"Windows-1252\"\n"
-    + "Content-Transfer-Encoding: 7bit\n"
-    + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
-    + testVars[0][2]+'\n' 
-    + "From somebody@example.com  Wed Jul 26 11:32:16 2000\n"
-    + "Return-Path: <so...@example.com>\n"
-    + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
-    + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
-    + "Received: (qmail 73966 invoked from network); 26 Jul 2000 11:32:16 -0000\n"
-    + "User-Agent: Microsoft-Outlook-Express-Macintosh-Edition/5.02.2022\n"
-    + "Date: Wed, 26 Jul 2000 13:32:08 +0200\n"
-    + "Subject: "+testVars[1][1]+ '\n'
-    + "From: Another Test <so...@example.com>\n"
-    + "To: <an...@jakarta.apache.org>\n"
-    + "Message-Id: <"+testVars[1][0]+">\n"
-    + "Mime-Version: 1.0\n"
-    + "Content-Type: text/plain; charset=\"US-ASCII\"\n"
-    + "Content-Transfer-Encoding: 7bit\n"
-    + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
-    + testVars[1][2];
+      + "Return-Path: <us...@example.com>\n"
+      + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
+      + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
+      + "Received: (qmail 49267 invoked from network); 24 Jul 2000 19:13:53 -0000\n"
+      + "Message-ID: <" + testVars[0][0] + ">\n"
+      + "From: \"Testy McTester\" <us...@example.com>\n"
+      + "To: <an...@jakarta.apache.org>\n"
+      + "Subject: " + testVars[0][1] + '\n'
+      + "Date: Mon, 24 Jul 2000 12:24:56 -0700\n"
+      + "MIME-Version: 1.0\n"
+      + "Content-Type: text/plain;\n"
+      + "  charset=\"Windows-1252\"\n"
+      + "Content-Transfer-Encoding: 7bit\n"
+      + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
+      + testVars[0][2] + '\n'
+      + "From somebody@example.com  Wed Jul 26 11:32:16 2000\n"
+      + "Return-Path: <so...@example.com>\n"
+      + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
+      + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
+      + "Received: (qmail 73966 invoked from network); 26 Jul 2000 11:32:16 -0000\n"
+      + "User-Agent: Microsoft-Outlook-Express-Macintosh-Edition/5.02.2022\n"
+      + "Date: Wed, 26 Jul 2000 13:32:08 +0200\n"
+      + "Subject: " + testVars[1][1] + '\n'
+      + "From: Another Test <so...@example.com>\n"
+      + "To: <an...@jakarta.apache.org>\n"
+      + "Message-Id: <" + testVars[1][0] + ">\n"
+      + "Mime-Version: 1.0\n"
+      + "Content-Type: text/plain; charset=\"US-ASCII\"\n"
+      + "Content-Transfer-Encoding: 7bit\n"
+      + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
+      + testVars[1][2];
 }

Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Sun Jun 23 18:15:56 2013
@@ -17,6 +17,7 @@
 
 package org.apache.mahout.text;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.Map;
@@ -30,53 +31,160 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.Pair;
 import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
 import org.apache.mahout.utils.MahoutTestCase;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class TestSequenceFilesFromDirectory extends MahoutTestCase {
 
+  private static final Logger logger = LoggerFactory.getLogger(TestSequenceFilesFromDirectory.class);
+
   private static final String[][] DATA1 = {
-      {"test1", "This is the first text."},
-      {"test2", "This is the second text."},
-      {"test3", "This is the third text."}
+    {"test1", "This is the first text."},
+    {"test2", "This is the second text."},
+    {"test3", "This is the third text."}
+  };
+
+  private static final String[][] DATA2 = {
+    {"recursive_test1", "This is the first text."},
+    {"recursive_test2", "This is the second text."},
+    {"recursive_test3", "This is the third text."}
   };
 
-  /**
-   * Story converting text files to SequenceFile
-   */
   @Test
   public void testSequenceFileFromDirectoryBasic() throws Exception {
     // parameters
     Configuration conf = new Configuration();
-    
+
     FileSystem fs = FileSystem.get(conf);
-    
+
     // create
     Path tmpDir = this.getTestTempDirPath();
     Path inputDir = new Path(tmpDir, "inputDir");
     fs.mkdirs(inputDir);
+
     Path outputDir = new Path(tmpDir, "outputDir");
-    
+    Path outputDirRecursive = new Path(tmpDir, "outputDirRecursive");
+
+    Path inputDirRecursive = new Path(tmpDir, "inputDirRecur");
+    fs.mkdirs(inputDirRecursive);
+
     // prepare input files
     createFilesFromArrays(conf, inputDir, DATA1);
 
-    SequenceFilesFromDirectory.main(new String[] {
-        "--input", inputDir.toString(),
-        "--output", outputDir.toString(),
-        "--chunkSize", "64",
-        "--charset", Charsets.UTF_8.name(),
-        "--keyPrefix", "UID"});
-    
+    SequenceFilesFromDirectory.main(new String[]{
+      "--input", inputDir.toString(),
+      "--output", outputDir.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--keyPrefix", "UID",
+      "--method", "sequential"});
+
     // check output chunk files
     checkChunkFiles(conf, outputDir, DATA1, "UID");
+
+    createRecursiveDirFilesFromArrays(conf, inputDirRecursive, DATA2);
+
+    FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive);
+    String dirs = AbstractJob.buildDirList(fs, fstInputPath);
+
+    System.out.println("\n\n ----- recursive dirs: " + dirs);
+    SequenceFilesFromDirectory.main(new String[]{
+      "--input", inputDirRecursive.toString(),
+      "--output", outputDirRecursive.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--keyPrefix", "UID",
+      "--method", "sequential"});
+
+    checkRecursiveChunkFiles(conf, outputDirRecursive, DATA2, "UID");
   }
 
+  @Test
+  public void testSequenceFileFromDirectoryMapReduce() throws Exception {
+
+    Configuration conf = new Configuration();
+
+    FileSystem fs = FileSystem.get(conf);
+
+    // create
+    Path tmpDir = this.getTestTempDirPath();
+    Path inputDir = new Path(tmpDir, "inputDir");
+    fs.mkdirs(inputDir);
+
+    Path inputDirRecur = new Path(tmpDir, "inputDirRecur");
+    fs.mkdirs(inputDirRecur);
+
+    Path mrOutputDir = new Path(tmpDir, "mrOutputDir");
+    Path mrOutputDirRecur = new Path(tmpDir, "mrOutputDirRecur");
+
+    createFilesFromArrays(conf, inputDir, DATA1);
+
+    SequenceFilesFromDirectory.main(new String[]{
+      "--input", inputDir.toString(),
+      "--output", mrOutputDir.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--method", "mapreduce",
+      "--keyPrefix", "UID"});
+
+    checkMRResultFiles(conf, mrOutputDir, DATA1, "UID");
+
+    createRecursiveDirFilesFromArrays(conf, inputDirRecur, DATA2);
+
+    FileStatus fst_input_path = fs.getFileStatus(inputDirRecur);
+    String dirs = AbstractJob.buildDirList(fs, fst_input_path);
+
+    logger.info("\n\n ---- recursive dirs: {}", dirs);
+
+    SequenceFilesFromDirectory.main(new String[]{
+      "--input", inputDirRecur.toString(),
+      "--output", mrOutputDirRecur.toString(),
+      "--chunkSize", "64",
+      "--charset", Charsets.UTF_8.name(),
+      "--method", "mapreduce",
+      "--keyPrefix", "UID"});
+
+    checkMRResultFilesRecursive(conf, mrOutputDirRecur, DATA2, "UID");
+  }
+
+
   private static void createFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
     FileSystem fs = FileSystem.get(conf);
+    OutputStreamWriter writer;
     for (String[] aData : data) {
-      OutputStreamWriter writer = new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), Charsets.UTF_8);
+      writer = new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), Charsets.UTF_8);
+      try {
+        writer.write(aData[1]);
+      } finally {
+        Closeables.close(writer, false);
+      }
+    }
+  }
+
+  private static void createRecursiveDirFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    logger.info("creativeRecursiveDirFilesFromArrays > based on: {}", inputDir.toString());
+    Path curPath;
+    String currentRecursiveDir = inputDir.toString();
+
+    for (String[] aData : data) {
+      OutputStreamWriter writer;
+
+      currentRecursiveDir += "/" + aData[0];
+      File subDir = new File(currentRecursiveDir);
+      subDir.mkdir();
+
+      curPath = new Path(subDir.toString(), "file.txt");
+      writer = new OutputStreamWriter(fs.create(curPath), Charsets.UTF_8);
+
+      logger.info("Created file: {}", curPath.toString());
+
       try {
         writer.write(aData[1]);
       } finally {
@@ -90,24 +198,22 @@ public final class TestSequenceFilesFrom
                                       String[][] data,
                                       String prefix) throws IOException {
     FileSystem fs = FileSystem.get(conf);
-    
+
     // output exists?
-    FileStatus[] fstats = fs.listStatus(outputDir, new ExcludeDotFiles());
-    assertEquals(1, fstats.length); // only one
-    assertEquals("chunk-0", fstats[0].getPath().getName());
-    
+    FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("chunk-0", fileStatuses[0].getPath().getName());
 
-    Map<String,String> fileToData = Maps.newHashMap();
+    Map<String, String> fileToData = Maps.newHashMap();
     for (String[] aData : data) {
       fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
     }
 
     // read a chunk to check content
-    SequenceFileIterator<Text,Text> iterator = new SequenceFileIterator<Text,Text>(fstats[0].getPath(), true, conf);
+    SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, conf);
     try {
-      for (String[] datum : data) {
-        assertTrue(iterator.hasNext());
-        Pair<Text,Text> record = iterator.next();
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
         String retrievedData = fileToData.get(record.getFirst().toString().trim());
         assertNotNull(retrievedData);
         assertEquals(retrievedData, record.getSecond().toString().trim());
@@ -116,16 +222,116 @@ public final class TestSequenceFilesFrom
       Closeables.close(iterator, true);
     }
   }
-  
+
   /**
    * exclude hidden (starting with dot) files
    */
   private static class ExcludeDotFiles implements PathFilter {
     @Override
     public boolean accept(Path file) {
-      return !file.getName().startsWith(".");
+      return !file.getName().startsWith(".") && !file.getName().startsWith("_");
     }
   }
 
+  private static void checkRecursiveChunkFiles(Configuration conf,
+                                               Path outputDir,
+                                               String[][] data,
+                                               String prefix) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    System.out.println(" ----------- check_Recursive_ChunkFiles ------------");
+
+    // output exists?
+    FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("chunk-0", fileStatuses[0].getPath().getName());
+
+
+    Map<String, String> fileToData = Maps.newHashMap();
+    String currentPath = prefix;
+    for (String[] aData : data) {
+      currentPath += Path.SEPARATOR + aData[0];
+      fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]);
+    }
+
+    // read a chunk to check content
+    SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, conf);
+    try {
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        String retrievedData = fileToData.get(record.getFirst().toString().trim());
+        System.out.printf("%s >> %s\n", record.getFirst().toString().trim(), record.getSecond().toString().trim());
+
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+        System.out.printf(">>> k: %s, v: %s\n", record.getFirst().toString(), record.getSecond().toString());
+      }
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+
+  private static void checkMRResultFiles(Configuration conf, Path outputDir,
+                                         String[][] data, String prefix) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    // output exists?
+    FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+    Map<String, String> fileToData = Maps.newHashMap();
+    for (String[] aData : data) {
+      System.out.printf("map.put: %s %s\n", prefix + Path.SEPARATOR + aData[0], aData[1]);
+      fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+    }
+
+    // read a chunk to check content
+    SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(
+      fileStatuses[0].getPath(), true, conf);
+    try {
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        String retrievedData = fileToData.get(record.getFirst().toString().trim());
+
+        System.out.printf("MR> %s >> %s\n", record.getFirst().toString().trim(), record.getSecond().toString().trim());
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+      }
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
+
+  private static void checkMRResultFilesRecursive(Configuration conf, Path outputDir,
+                                                  String[][] data, String prefix) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    // output exists?
+    FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+    assertEquals(1, fileStatuses.length); // only one
+    assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+    Map<String, String> fileToData = Maps.newHashMap();
+    String currentPath = prefix;
+
+    for (String[] aData : data) {
+      currentPath += Path.SEPARATOR + aData[0];
+      fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]);
+    }
+
+    // read a chunk to check content
+    SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(
+      fileStatuses[0].getPath(), true, conf);
+    try {
+      while (iterator.hasNext()) {
+        Pair<Text, Text> record = iterator.next();
+        System.out.printf("MR-Recur > Trying to check: %s\n", record.getFirst().toString().trim());
+        String retrievedData = fileToData.get(record.getFirst().toString().trim());
+        assertNotNull(retrievedData);
+        assertEquals(retrievedData, record.getSecond().toString().trim());
+      }
+    } finally {
+      Closeables.close(iterator, true);
+    }
+  }
 }