You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2011/01/21 00:36:58 UTC
svn commit: r1061569 - in /mahout/trunk/utils/src:
main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Author: srowen
Date: Thu Jan 20 23:36:57 2011
New Revision: 1061569
URL: http://svn.apache.org/viewvc?rev=1061569&view=rev
Log:
MAHOUT-535 Operates now in terms of Path, so it can support local or HDFS file as input
Added:
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Modified:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1061569&r1=1061568&r2=1061569&view=diff
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Thu Jan 20 23:36:57 2011
@@ -18,31 +18,24 @@
package org.apache.mahout.text;
import java.io.Closeable;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
-import org.apache.commons.cli2.CommandLine;
-import org.apache.commons.cli2.Group;
-import org.apache.commons.cli2.Option;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
-import org.apache.commons.cli2.builder.GroupBuilder;
-import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.CommandLineUtil;
import org.apache.mahout.common.FileLineIterable;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,190 +45,172 @@ import org.slf4j.LoggerFactory;
* {@link SequenceFile}s of docid => content. The docid is set as the relative path of the document from the
* parent directory prepended with a specified prefix. You can also specify the input encoding of the text
* files. The content of the output SequenceFiles are encoded as UTF-8 text.
- *
- *
*/
public final class SequenceFilesFromDirectory extends AbstractJob {
private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
- private static ChunkedWriter createNewChunkedWriter(int chunkSizeInMB, String outputDir) throws IOException {
- return new ChunkedWriter(chunkSizeInMB, outputDir);
- }
-
- public void createSequenceFiles(File parentDir, String outputDir, String prefix, int chunkSizeInMB, Charset charset, String filter)
- throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException,
- InstantiationException {
- ChunkedWriter writer = createNewChunkedWriter(chunkSizeInMB, outputDir);
- if ("PrefixAdditionFilter".equals(filter)) {
- parentDir.listFiles(new PrefixAdditionFilter(prefix, writer, charset));
+ private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
+
+ public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
+ public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass","filter"};
+ public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+ public static final String[] CHARSET_OPTION = {"charset", "c"};
+
+ public void run(Configuration conf,
+ Path input,
+ Path output,
+ String prefix,
+ int chunkSizeInMB,
+ Charset charset,
+ String fileFilterClassName)
+ throws IllegalArgumentException, InstantiationException, IllegalAccessException, InvocationTargetException,
+ IOException, SecurityException, NoSuchMethodException, ClassNotFoundException {
+ FileSystem fs = FileSystem.get(conf);
+ ChunkedWriter writer = new ChunkedWriter(conf, chunkSizeInMB, output);
+
+ PathFilter pathFilter;
+
+ if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+ pathFilter = new PrefixAdditionFilter(conf, prefix, writer, charset);
} else {
- Class filterClass = Class.forName(filter);
- Constructor<FileFilter> constructor = filterClass.getConstructor(String.class, ChunkedWriter.class, Charset.class);
- FileFilter fileFilter = constructor.newInstance(prefix, writer, charset);
- parentDir.listFiles(fileFilter);
+ Class<? extends PathFilter> pathFilterClass = Class.forName(fileFilterClassName).asSubclass(PathFilter.class);
+ Constructor<? extends PathFilter> constructor =
+ pathFilterClass.getConstructor(Configuration.class, String.class, ChunkedWriter.class, Charset.class);
+ pathFilter = constructor.newInstance(conf, prefix, writer, charset);
}
+ fs.listStatus(input, pathFilter);
writer.close();
}
+
+ private static final class ChunkedWriter implements Closeable {
- public static class ChunkedWriter implements Closeable {
private final int maxChunkSizeInBytes;
-
- private final String outputDir;
-
+ private final Path output;
private SequenceFile.Writer writer;
-
private int currentChunkID;
-
private int currentChunkSize;
-
- private final Configuration conf = new Configuration();
-
private final FileSystem fs;
-
- public ChunkedWriter(int chunkSizeInMB, String outputDir) throws IOException {
+ private final Configuration conf;
+
+ private ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output) throws IOException {
+ this.output = output;
+ this.conf = conf;
if (chunkSizeInMB > 1984) {
chunkSizeInMB = 1984;
}
maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
- this.outputDir = outputDir;
fs = FileSystem.get(conf);
currentChunkID = 0;
writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), Text.class, Text.class);
}
-
+
private Path getPath(int chunkID) {
- return new Path(outputDir + "/chunk-" + chunkID);
+ return new Path(output, "chunk-" + chunkID);
}
-
+
public void write(String key, String value) throws IOException {
if (currentChunkSize > maxChunkSizeInBytes) {
writer.close();
writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++), Text.class, Text.class);
currentChunkSize = 0;
-
}
-
+
Text keyT = new Text(key);
Text valueT = new Text(value);
currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // Overhead
writer.append(keyT, valueT);
}
-
+
@Override
public void close() throws IOException {
writer.close();
}
}
+
+ private final class PrefixAdditionFilter implements PathFilter {
- public class PrefixAdditionFilter implements FileFilter {
private final String prefix;
-
private final ChunkedWriter writer;
-
private final Charset charset;
-
- public PrefixAdditionFilter(String prefix, ChunkedWriter writer, Charset charset) {
+ private final Configuration conf;
+ private final FileSystem fs;
+
+ private PrefixAdditionFilter(Configuration conf, String prefix, ChunkedWriter writer, Charset charset)
+ throws IOException {
+ this.conf = conf;
this.prefix = prefix;
this.writer = writer;
this.charset = charset;
+ this.fs = FileSystem.get(conf);
}
-
+
@Override
- public boolean accept(File current) {
- if (current.isDirectory()) {
- current.listFiles(new PrefixAdditionFilter(prefix + File.separator + current.getName(), writer, charset));
- } else {
- try {
- StringBuilder file = new StringBuilder();
- for (String aFit : new FileLineIterable(current, charset, false)) {
- file.append(aFit).append('\n');
+ public boolean accept(Path current) {
+ log.debug("CURRENT: {}", current.getName());
+ try {
+ FileStatus[] fstatus = fs.listStatus(current);
+ for (FileStatus fst : fstatus) {
+ log.debug("CHILD: {}", fst.getPath().getName());
+ if (fst.isDir()) {
+ fs.listStatus(fst.getPath(),
+ new PrefixAdditionFilter(conf, prefix + Path.SEPARATOR + current.getName(), writer, charset));
+ } else {
+ StringBuilder file = new StringBuilder();
+ InputStream in = fs.open(fst.getPath());
+ for (String aFit : new FileLineIterable(in, charset, false)) {
+ file.append(aFit).append('\n');
+ }
+ String name = current.getName().equals(fst.getPath().getName())
+ ? current.getName()
+ : current.getName() + Path.SEPARATOR + fst.getPath().getName();
+ writer.write(prefix + Path.SEPARATOR + name, file.toString());
}
- writer.write(prefix + File.separator + current.getName(), file.toString());
-
- } catch (FileNotFoundException e) {
- // Skip file.
- } catch (IOException e) {
- // TODO: report exceptions and continue;
- throw new IllegalStateException(e);
}
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
}
return false;
}
-
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new SequenceFilesFromDirectory(), args);
}
-
+
+ /*
+ * callback main after processing hadoop parameters
+ */
@Override
- public int run(String[] args) throws Exception {
- DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
- ArgumentBuilder abuilder = new ArgumentBuilder();
- GroupBuilder gbuilder = new GroupBuilder();
-
- Option parentOpt = obuilder.withLongName("input").withRequired(true).withArgument(abuilder.withName("input").withMinimum(1)
- .withMaximum(1).create()).withDescription("The input dir containing the documents").withShortName("i").create();
-
- Option outputDirOpt = obuilder.withLongName("output").withRequired(true).withArgument(abuilder.withName("output")
- .withMinimum(1).withMaximum(1).create()).withDescription("The output directory").withShortName("o").create();
-
- Option chunkSizeOpt = obuilder.withLongName("chunkSize").withArgument(abuilder.withName("chunkSize").withMinimum(1)
- .withMaximum(1).create()).withDescription("The chunkSize in MegaBytes. Defaults to 64").withShortName("chunk").create();
-
- Option keyPrefixOpt = obuilder.withLongName("keyPrefix").withArgument(abuilder.withName("keyPrefix").withMinimum(1)
- .withMaximum(1).create()).withDescription("The prefix to be prepended to the key").withShortName("prefix").create();
-
- Option charsetOpt = obuilder.withLongName("charset").withRequired(true).withArgument(abuilder.withName("charset")
- .withMinimum(1).withMaximum(1).create()).withDescription("The name of the character encoding of the input files")
- .withShortName("c").create();
-
- Option fileFilterOpt = obuilder.withLongName("fileFilterClass").withArgument(abuilder.withName("fileFilterClass")
- .withMinimum(1).withMaximum(1).create())
- .withDescription("The name of the class to use for file parsing. Default: PrefixAdditionFilter").withShortName("filter")
- .create();
-
- Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h").create();
-
- Group group = gbuilder.withName("Options").withOption(keyPrefixOpt).withOption(chunkSizeOpt).withOption(charsetOpt)
- .withOption(outputDirOpt).withOption(fileFilterOpt).withOption(helpOpt).withOption(parentOpt).create();
-
- try {
- Parser parser = new Parser();
- parser.setGroup(group);
- parser.setHelpOption(helpOpt);
- CommandLine cmdLine = parser.parse(args);
- if (cmdLine.hasOption(helpOpt)) {
- CommandLineUtil.printHelp(group);
- return -1;
- }
- File parentDir = new File((String) cmdLine.getValue(parentOpt));
- String outputDir = (String) cmdLine.getValue(outputDirOpt);
-
- int chunkSize = 64;
- if (cmdLine.hasOption(chunkSizeOpt)) {
- chunkSize = Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
- }
-
- String prefix = "";
- if (cmdLine.hasOption(keyPrefixOpt)) {
- prefix = (String) cmdLine.getValue(keyPrefixOpt);
- }
-
- String filter = "PrefixAdditionFilter";
- if (cmdLine.hasOption(fileFilterOpt)) {
- filter = (String) cmdLine.getValue(fileFilterOpt);
- }
-
- Charset charset = Charset.forName((String) cmdLine.getValue(charsetOpt));
- SequenceFilesFromDirectory dir = new SequenceFilesFromDirectory();
-
- dir.createSequenceFiles(parentDir, outputDir, prefix, chunkSize, charset, filter);
- } catch (OptionException e) {
- log.error("Exception", e);
- CommandLineUtil.printHelp(group);
- }
+ public int run(String[] args)
+ throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException,
+ InvocationTargetException {
+
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+ addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
+ "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+ addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+ addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+ "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+
+ if (parseArguments(args) == null) {
+ return -1;
+ }
+
+ Path input = getInputPath();
+ Path output = getOutputPath();
+ if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+ HadoopUtil.overwriteOutput(output);
+ }
+ int chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+ String fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
+ String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+ Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+
+ run(getConf(), input, output, keyPrefix, chunkSize, charset, fileFilterClassName);
return 0;
}
}
Added: mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1061569&view=auto
==============================================================================
--- mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (added)
+++ mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Thu Jan 20 23:36:57 2011
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.utils.MahoutTestCase;
+import org.junit.Before;
+import org.junit.Test;
+
+public final class TestSequenceFilesFromDirectory extends MahoutTestCase {
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final String[][] DATA1 = {
+ {"test1", "This is the first text."},
+ {"test2", "This is the second text."},
+ {"test3", "This is the third text."}
+ };
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ /** Story converting text files to SequenceFile */
+ @Test
+ public void testSequnceFileFromTsvBasic() throws Exception {
+ // parameters
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(conf);
+
+ // create
+ Path tmpDir = this.getTestTempDirPath();
+ Path inputDir = new Path(tmpDir, "inputDir");
+ fs.mkdirs(inputDir);
+ Path outputDir = new Path(tmpDir, "outputDir");
+
+ // prepare input files
+ createFilesFromArrays(conf, inputDir, DATA1);
+
+ String prefix = "UID";
+ SequenceFilesFromDirectory.main(new String[] {"--input",
+ inputDir.toString(), "--output", outputDir.toString(), "--chunkSize",
+ "64", "--charset",
+ UTF8.displayName(Locale.ENGLISH), "--keyPrefix", prefix});
+
+ // check output chunk files
+ checkChunkFiles(conf, outputDir, DATA1, prefix);
+ }
+
+ private static void createFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ for (String[] aData : data) {
+ OutputStreamWriter osw = new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), UTF8);
+ osw.write(aData[1]);
+ osw.close();
+ }
+ }
+
+ private static void checkChunkFiles(Configuration conf, Path outputDir, String[][] data, String prefix)
+ throws IOException, InstantiationException, IllegalAccessException {
+ FileSystem fs = FileSystem.get(conf);
+
+ // output exists?
+ FileStatus[] fstats = fs.listStatus(outputDir, new ExcludeDotFiles());
+ assertEquals(1, fstats.length); // only one
+ assertEquals("chunk-0", fstats[0].getPath().getName());
+
+ // read a chunk to check content
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, fstats[0].getPath(), conf);
+ assertEquals("org.apache.hadoop.io.Text", reader.getKeyClassName());
+ assertEquals("org.apache.hadoop.io.Text", reader.getValueClassName());
+ Writable key = reader.getKeyClass().asSubclass(Writable.class).newInstance();
+ Writable value = reader.getValueClass().asSubclass(Writable.class).newInstance();
+
+ Map<String,String> fileToData = new HashMap<String,String>();
+ for (String[] aData : data) {
+ fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+ }
+
+ for (String[] aData : data) {
+ assertTrue(reader.next(key, value));
+ String retrievedData = fileToData.get(key.toString().trim());
+ assertNotNull(retrievedData);
+ assertEquals(retrievedData, value.toString().trim());
+ }
+ reader.close();
+ }
+
+ /**
+ * exclude hidden (starting with dot) files
+ */
+ private static class ExcludeDotFiles implements PathFilter {
+ @Override
+ public boolean accept(Path file) {
+ return !file.getName().startsWith(".");
+ }
+ }
+
+}
+