You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2011/01/21 00:36:58 UTC

svn commit: r1061569 - in /mahout/trunk/utils/src: main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java

Author: srowen
Date: Thu Jan 20 23:36:57 2011
New Revision: 1061569

URL: http://svn.apache.org/viewvc?rev=1061569&view=rev
Log:
MAHOUT-535 Operates now in terms of Path, so it can support local or HDFS file as input

Added:
    mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Modified:
    mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java

Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1061569&r1=1061568&r2=1061569&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Thu Jan 20 23:36:57 2011
@@ -18,31 +18,24 @@
 package org.apache.mahout.text;
 
 import java.io.Closeable;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
 
-import org.apache.commons.cli2.CommandLine;
-import org.apache.commons.cli2.Group;
-import org.apache.commons.cli2.Option;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
-import org.apache.commons.cli2.builder.GroupBuilder;
-import org.apache.commons.cli2.commandline.Parser;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.CommandLineUtil;
 import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,190 +45,172 @@ import org.slf4j.LoggerFactory;
  * {@link SequenceFile}s of docid => content. The docid is set as the relative path of the document from the
  * parent directory prepended with a specified prefix. You can also specify the input encoding of the text
  * files. The content of the output SequenceFiles are encoded as UTF-8 text.
- * 
- * 
  */
 public final class SequenceFilesFromDirectory extends AbstractJob {
 
   private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
 
-  private static ChunkedWriter createNewChunkedWriter(int chunkSizeInMB, String outputDir) throws IOException {
-    return new ChunkedWriter(chunkSizeInMB, outputDir);
-  }
-
-  public void createSequenceFiles(File parentDir, String outputDir, String prefix, int chunkSizeInMB, Charset charset, String filter)
-      throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException,
-      InstantiationException {
-    ChunkedWriter writer = createNewChunkedWriter(chunkSizeInMB, outputDir);
-    if ("PrefixAdditionFilter".equals(filter)) {
-      parentDir.listFiles(new PrefixAdditionFilter(prefix, writer, charset));
+  private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
+  
+  public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
+  public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass","filter"};
+  public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+  public static final String[] CHARSET_OPTION = {"charset", "c"};
+
+  public void run(Configuration conf,
+                  Path input,
+                  Path output,
+                  String prefix,
+                  int chunkSizeInMB,
+                  Charset charset,
+                  String fileFilterClassName)
+    throws IllegalArgumentException, InstantiationException, IllegalAccessException, InvocationTargetException,
+           IOException, SecurityException, NoSuchMethodException, ClassNotFoundException {
+    FileSystem fs = FileSystem.get(conf);
+    ChunkedWriter writer = new ChunkedWriter(conf, chunkSizeInMB, output);
+    
+    PathFilter pathFilter;
+    
+    if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+      pathFilter = new PrefixAdditionFilter(conf, prefix, writer, charset);
     } else {
-      Class filterClass = Class.forName(filter);
-      Constructor<FileFilter> constructor = filterClass.getConstructor(String.class, ChunkedWriter.class, Charset.class);
-      FileFilter fileFilter = constructor.newInstance(prefix, writer, charset);
-      parentDir.listFiles(fileFilter);
+      Class<? extends PathFilter> pathFilterClass = Class.forName(fileFilterClassName).asSubclass(PathFilter.class);
+      Constructor<? extends PathFilter> constructor =
+          pathFilterClass.getConstructor(Configuration.class, String.class, ChunkedWriter.class, Charset.class);
+      pathFilter = constructor.newInstance(conf, prefix, writer, charset);
     }
+    fs.listStatus(input, pathFilter);
     writer.close();
   }
+  
+  private static final class ChunkedWriter implements Closeable {
 
-  public static class ChunkedWriter implements Closeable {
     private final int maxChunkSizeInBytes;
-
-    private final String outputDir;
-
+    private final Path output;
     private SequenceFile.Writer writer;
-
     private int currentChunkID;
-
     private int currentChunkSize;
-
-    private final Configuration conf = new Configuration();
-
     private final FileSystem fs;
-
-    public ChunkedWriter(int chunkSizeInMB, String outputDir) throws IOException {
+    private final Configuration conf;
+    
+    private ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output) throws IOException {
+      this.output = output;
+      this.conf = conf;
       if (chunkSizeInMB > 1984) {
         chunkSizeInMB = 1984;
       }
       maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
-      this.outputDir = outputDir;
       fs = FileSystem.get(conf);
       currentChunkID = 0;
       writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), Text.class, Text.class);
     }
-
+    
     private Path getPath(int chunkID) {
-      return new Path(outputDir + "/chunk-" + chunkID);
+      return new Path(output, "chunk-" + chunkID);
     }
-
+    
     public void write(String key, String value) throws IOException {
       if (currentChunkSize > maxChunkSizeInBytes) {
         writer.close();
         writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++), Text.class, Text.class);
         currentChunkSize = 0;
-
       }
-
+      
       Text keyT = new Text(key);
       Text valueT = new Text(value);
       currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // Overhead
       writer.append(keyT, valueT);
     }
-
+    
     @Override
     public void close() throws IOException {
       writer.close();
     }
   }
+  
+  private final class PrefixAdditionFilter implements PathFilter {
 
-  public class PrefixAdditionFilter implements FileFilter {
     private final String prefix;
-
     private final ChunkedWriter writer;
-
     private final Charset charset;
-
-    public PrefixAdditionFilter(String prefix, ChunkedWriter writer, Charset charset) {
+    private final Configuration conf;
+    private final FileSystem fs;
+    
+    private PrefixAdditionFilter(Configuration conf, String prefix, ChunkedWriter writer, Charset charset)
+      throws IOException {
+      this.conf = conf;
       this.prefix = prefix;
       this.writer = writer;
       this.charset = charset;
+      this.fs = FileSystem.get(conf);
     }
-
+    
     @Override
-    public boolean accept(File current) {
-      if (current.isDirectory()) {
-        current.listFiles(new PrefixAdditionFilter(prefix + File.separator + current.getName(), writer, charset));
-      } else {
-        try {
-          StringBuilder file = new StringBuilder();
-          for (String aFit : new FileLineIterable(current, charset, false)) {
-            file.append(aFit).append('\n');
+    public boolean accept(Path current) {
+      log.debug("CURRENT: {}", current.getName());
+      try {
+        FileStatus[] fstatus = fs.listStatus(current);
+        for (FileStatus fst : fstatus) {
+          log.debug("CHILD: {}", fst.getPath().getName());
+          if (fst.isDir()) {
+            fs.listStatus(fst.getPath(),
+                          new PrefixAdditionFilter(conf, prefix + Path.SEPARATOR + current.getName(), writer, charset));
+          } else {
+            StringBuilder file = new StringBuilder();
+            InputStream in = fs.open(fst.getPath());
+            for (String aFit : new FileLineIterable(in, charset, false)) {
+              file.append(aFit).append('\n');
+            }
+            String name = current.getName().equals(fst.getPath().getName())
+                ? current.getName()
+                : current.getName() + Path.SEPARATOR + fst.getPath().getName();
+            writer.write(prefix + Path.SEPARATOR + name, file.toString());
           }
-          writer.write(prefix + File.separator + current.getName(), file.toString());
-
-        } catch (FileNotFoundException e) {
-          // Skip file.
-        } catch (IOException e) {
-          // TODO: report exceptions and continue;
-          throw new IllegalStateException(e);
         }
+      } catch (Exception e) {
+        throw new IllegalStateException(e);
       }
       return false;
     }
-
   }
 
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new SequenceFilesFromDirectory(), args);
   }
-
+  
+  /*
+   * callback main after processing hadoop parameters
+   */
   @Override
-  public int run(String[] args) throws Exception {
-    DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
-    ArgumentBuilder abuilder = new ArgumentBuilder();
-    GroupBuilder gbuilder = new GroupBuilder();
-
-    Option parentOpt = obuilder.withLongName("input").withRequired(true).withArgument(abuilder.withName("input").withMinimum(1)
-        .withMaximum(1).create()).withDescription("The input dir containing the documents").withShortName("i").create();
-
-    Option outputDirOpt = obuilder.withLongName("output").withRequired(true).withArgument(abuilder.withName("output")
-        .withMinimum(1).withMaximum(1).create()).withDescription("The output directory").withShortName("o").create();
-
-    Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(abuilder.withName("chunkSize").withMinimum(1)
-        .withMaximum(1).create()).withDescription("The chunkSize in MegaBytes. Defaults to 64").withShortName("chunk").create();
-
-    Option keyPrefixOpt = obuilder.withLongName("keyPrefix").withArgument(abuilder.withName("keyPrefix").withMinimum(1)
-        .withMaximum(1).create()).withDescription("The prefix to be prepended to the key").withShortName("prefix").create();
-
-    Option charsetOpt = obuilder.withLongName("charset").withRequired(true).withArgument(abuilder.withName("charset")
-        .withMinimum(1).withMaximum(1).create()).withDescription("The name of the character encoding of the input files")
-        .withShortName("c").create();
-
-    Option fileFilterOpt = obuilder.withLongName("fileFilterClass").withArgument(abuilder.withName("fileFilterClass")
-        .withMinimum(1).withMaximum(1).create())
-        .withDescription("The name of the class to use for file parsing. Default: PrefixAdditionFilter").withShortName("filter")
-        .create();
-
-    Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h").create();
-
-    Group group = gbuilder.withName("Options").withOption(keyPrefixOpt).withOption(chunkSizeOpt).withOption(charsetOpt)
-        .withOption(outputDirOpt).withOption(fileFilterOpt).withOption(helpOpt).withOption(parentOpt).create();
-
-    try {
-      Parser parser = new Parser();
-      parser.setGroup(group);
-      parser.setHelpOption(helpOpt);
-      CommandLine cmdLine = parser.parse(args);
-      if (cmdLine.hasOption(helpOpt)) {
-        CommandLineUtil.printHelp(group);
-        return -1;
-      }
-      File parentDir = new File((String) cmdLine.getValue(parentOpt));
-      String outputDir = (String) cmdLine.getValue(outputDirOpt);
-
-      int chunkSize = 64;
-      if (cmdLine.hasOption(chunkSizeOpt)) {
-        chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
-      }
-
-      String prefix = "";
-      if (cmdLine.hasOption(keyPrefixOpt)) {
-        prefix = (String) cmdLine.getValue(keyPrefixOpt);
-      }
-
-      String filter = "PrefixAdditionFilter";
-      if (cmdLine.hasOption(fileFilterOpt)) {
-        filter = (String) cmdLine.getValue(fileFilterOpt);
-      }
-
-      Charset charset = Charset.forName((String) cmdLine.getValue(charsetOpt));
-      SequenceFilesFromDirectory dir = new SequenceFilesFromDirectory();
-
-      dir.createSequenceFiles(parentDir, outputDir, prefix, chunkSize, charset, filter);
-    } catch (OptionException e) {
-      log.error("Exception", e);
-      CommandLineUtil.printHelp(group);
-    }
+  public int run(String[] args)
+    throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException,
+           InvocationTargetException {
+    
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+    addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
+        "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+        "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+    
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+    
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.overwriteOutput(output);
+    }
+    int chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+    String fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
+    String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+    Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+    
+    run(getConf(), input, output, keyPrefix, chunkSize, charset, fileFilterClassName);
     return 0;
   }
 }

Added: mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1061569&view=auto
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (added)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Thu Jan 20 23:36:57 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.utils.MahoutTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class TestSequenceFilesFromDirectory extends MahoutTestCase {
+
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  private static final String[][] DATA1 = {
+      {"test1", "This is the first text."},
+      {"test2", "This is the second text."},
+      {"test3", "This is the third text."}
+  };
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  /** Story converting text files to SequenceFile */
+  @Test
+  public void testSequnceFileFromTsvBasic() throws Exception {
+    // parameters
+    Configuration conf = new Configuration();
+    
+    FileSystem fs = FileSystem.get(conf);
+    
+    // create
+    Path tmpDir = this.getTestTempDirPath();
+    Path inputDir = new Path(tmpDir, "inputDir");
+    fs.mkdirs(inputDir);
+    Path outputDir = new Path(tmpDir, "outputDir");
+    
+    // prepare input files
+    createFilesFromArrays(conf, inputDir, DATA1);
+
+    String prefix = "UID";
+    SequenceFilesFromDirectory.main(new String[] {"--input",
+        inputDir.toString(), "--output", outputDir.toString(), "--chunkSize",
+        "64", "--charset",
+        UTF8.displayName(Locale.ENGLISH), "--keyPrefix", prefix});
+    
+    // check output chunk files
+    checkChunkFiles(conf, outputDir, DATA1, prefix);
+  }
+
+  private static void createFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    for (String[] aData : data) {
+      OutputStreamWriter osw = new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), UTF8);
+      osw.write(aData[1]);
+      osw.close();
+    }
+  }
+
+  private static void checkChunkFiles(Configuration conf, Path outputDir, String[][] data, String prefix)
+    throws IOException, InstantiationException, IllegalAccessException {
+    FileSystem fs = FileSystem.get(conf);
+    
+    // output exists?
+    FileStatus[] fstats = fs.listStatus(outputDir, new ExcludeDotFiles());
+    assertEquals(1, fstats.length); // only one
+    assertEquals("chunk-0", fstats[0].getPath().getName());
+    
+    // read a chunk to check content
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, fstats[0].getPath(), conf);
+    assertEquals("org.apache.hadoop.io.Text", reader.getKeyClassName());
+    assertEquals("org.apache.hadoop.io.Text", reader.getValueClassName());
+    Writable key = reader.getKeyClass().asSubclass(Writable.class).newInstance();
+    Writable value = reader.getValueClass().asSubclass(Writable.class).newInstance();
+    
+    Map<String,String> fileToData = new HashMap<String,String>();
+    for (String[] aData : data) {
+      fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+    }
+
+    for (String[] aData : data) {
+      assertTrue(reader.next(key, value));
+      String retrievedData = fileToData.get(key.toString().trim());
+      assertNotNull(retrievedData);
+      assertEquals(retrievedData, value.toString().trim());
+    }
+    reader.close();
+  }
+  
+  /**
+   * exclude hidden (starting with dot) files
+   */
+  private static class ExcludeDotFiles implements PathFilter {
+    @Override
+    public boolean accept(Path file) {
+      return !file.getName().startsWith(".");
+    }
+  }
+
+}
+