You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/06/23 20:15:57 UTC
svn commit: r1495863 - in /mahout/trunk:
core/src/main/java/org/apache/mahout/common/
core/src/main/java/org/apache/mahout/common/iterator/
integration/src/main/java/org/apache/mahout/text/
integration/src/test/java/org/apache/mahout/text/
Author: smarthi
Date: Sun Jun 23 18:15:56 2013
New Revision: 1495863
URL: http://svn.apache.org/r1495863
Log:
MAHOUT-833: Make conversion to sequence files map-reduce - Checking in, tests pass
Added:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Sun Jun 23 18:15:56 2013
@@ -36,6 +36,7 @@ import org.apache.commons.cli2.builder.G
import org.apache.commons.cli2.commandline.Parser;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
@@ -116,6 +117,34 @@ public abstract class AbstractJob extend
options = Lists.newLinkedList();
}
+ /**
+ * Builds a comma-separated list of input splits
+ */
+ public static String buildDirList(FileSystem fs, FileStatus fileStatus) throws IOException {
+ StringBuilder dirList = new StringBuilder();
+ boolean bContainsFiles = false;
+
+ for (FileStatus childFileStatus : fs.listStatus(fileStatus.getPath())) {
+ if (childFileStatus.isDir()) {
+ String subDirectoryList = buildDirList(fs, childFileStatus);
+ if (subDirectoryList.length() > 0 && dirList.length() > 0) {
+ dirList.append(",");
+ }
+ dirList.append(subDirectoryList);
+ } else {
+ bContainsFiles = true;
+ }
+ }
+
+ if (bContainsFiles) {
+ if (dirList.length() > 0) {
+ dirList.append(",");
+ }
+ dirList.append(fileStatus.getPath().toUri().getPath());
+ }
+ return dirList.toString();
+ }
+
/** Returns the input path established by a call to {@link #parseArguments(String[])}.
* The source of the path may be an input option added using {@link #addInputOption()}
* or it may be the value of the {@code mapred.input.dir} configuration
@@ -634,7 +663,6 @@ public abstract class AbstractJob extend
// you can't instantiate it
//ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
AnalyzerUtils.createAnalyzer(analyzerClass);
-
}
return analyzerClass;
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterable.java Sun Jun 23 18:15:56 2013
@@ -31,12 +31,17 @@ import com.google.common.base.Charsets;
* defines lines.
*
* This class will uncompress files that end in .zip or .gz accordingly, too.
+ *
+ * In case that you pass the class a stream of compressed bytes, simply pass the
+ * original file name as well in the constructor and the system will decompress the bytes.
+ *
*/
public final class FileLineIterable implements Iterable<String> {
private final InputStream is;
private final Charset encoding;
private final boolean skipFirstLine;
+ private final String origFilename;
/** Creates a over a given file, assuming a UTF-8 encoding. */
public FileLineIterable(File file) throws IOException {
@@ -65,12 +70,21 @@ public final class FileLineIterable impl
this.is = is;
this.encoding = encoding;
this.skipFirstLine = skipFirstLine;
+ this.origFilename = "";
+ }
+
+ public FileLineIterable(InputStream is, Charset encoding, boolean skipFirstLine, String filename) {
+ this.is = is;
+ this.encoding = encoding;
+ this.skipFirstLine = skipFirstLine;
+ this.origFilename = filename;
}
+
@Override
public Iterator<String> iterator() {
try {
- return new FileLineIterator(is, encoding, skipFirstLine);
+ return new FileLineIterator(is, encoding, skipFirstLine, this.origFilename);
} catch (IOException ioe) {
throw new IllegalStateException(ioe);
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/FileLineIterator.java Sun Jun 23 18:15:56 2013
@@ -31,6 +31,7 @@ import java.util.zip.ZipInputStream;
import com.google.common.base.Charsets;
import com.google.common.collect.AbstractIterator;
import com.google.common.io.Closeables;
+import com.google.common.io.Files;
import org.apache.mahout.cf.taste.impl.common.SkippingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* Iterates over the lines of a text file. This assumes the text file's lines are delimited in a manner
* consistent with how {@link BufferedReader} defines lines.
- *
+ * <p/>
* This class will uncompress files that end in .zip or .gz accordingly, too.
*/
public final class FileLineIterator extends AbstractIterator<String> implements SkippingIterator<String>, Closeable {
@@ -47,11 +48,10 @@ public final class FileLineIterator exte
private static final Logger log = LoggerFactory.getLogger(FileLineIterator.class);
- /**
- * Creates a over a given file, assuming a UTF-8 encoding.
- *
- * @throws java.io.FileNotFoundException
- * if the file does not exist
+ /**
+ * Creates a over a given file, assuming a UTF-8 encoding.
+ *
+ * @throws java.io.FileNotFoundException if the file does not exist
* @throws IOException
* if the file cannot be read
*/
@@ -59,52 +59,66 @@ public final class FileLineIterator exte
public FileLineIterator(File file) throws IOException {
this(file, Charsets.UTF_8, false);
}
-
+
/**
* Creates a over a given file, assuming a UTF-8 encoding.
- *
- * @throws java.io.FileNotFoundException
- * if the file does not exist
- * @throws IOException
- * if the file cannot be read
+ *
+ * @throws java.io.FileNotFoundException if the file does not exist
+ * @throws IOException if the file cannot be read
*/
public FileLineIterator(File file, boolean skipFirstLine) throws IOException {
this(file, Charsets.UTF_8, skipFirstLine);
}
-
+
/**
* Creates a over a given file, using the given encoding.
- *
- * @throws java.io.FileNotFoundException
- * if the file does not exist
- * @throws IOException
- * if the file cannot be read
+ *
+ * @throws java.io.FileNotFoundException if the file does not exist
+ * @throws IOException if the file cannot be read
*/
public FileLineIterator(File file, Charset encoding, boolean skipFirstLine) throws IOException {
this(getFileInputStream(file), encoding, skipFirstLine);
}
-
+
public FileLineIterator(InputStream is) throws IOException {
this(is, Charsets.UTF_8, false);
}
-
+
public FileLineIterator(InputStream is, boolean skipFirstLine) throws IOException {
this(is, Charsets.UTF_8, skipFirstLine);
}
-
+
public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine) throws IOException {
reader = new BufferedReader(new InputStreamReader(is, encoding));
if (skipFirstLine) {
reader.readLine();
}
}
-
+
+ public FileLineIterator(InputStream is, Charset encoding, boolean skipFirstLine, String filename)
+ throws IOException {
+ InputStream compressedInputStream;
+
+ if ("gz".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+ compressedInputStream = new GZIPInputStream(is);
+ } else if ("zip".equalsIgnoreCase(Files.getFileExtension(filename.toLowerCase()))) {
+ compressedInputStream = new ZipInputStream(is);
+ } else {
+ compressedInputStream = is;
+ }
+
+ reader = new BufferedReader(new InputStreamReader(compressedInputStream, encoding));
+ if (skipFirstLine) {
+ reader.readLine();
+ }
+ }
+
static InputStream getFileInputStream(File file) throws IOException {
InputStream is = new FileInputStream(file);
String name = file.getName();
- if (name.endsWith(".gz")) {
+ if ("gz".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
return new GZIPInputStream(is);
- } else if (name.endsWith(".zip")) {
+ } else if ("zip".equalsIgnoreCase(Files.getFileExtension(name.toLowerCase()))) {
return new ZipInputStream(is);
} else {
return is;
@@ -127,7 +141,7 @@ public final class FileLineIterator exte
return line == null ? endOfData() : line;
}
-
+
@Override
public void skip(int n) {
try {
@@ -144,11 +158,11 @@ public final class FileLineIterator exte
}
}
}
-
+
@Override
public void close() throws IOException {
endOfData();
Closeables.close(reader, true);
}
-
+
}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+/**
+ *
+ * Used in combining a large number of text files into one text input reader
+ * along with the WholeFileRecordReader class.
+ *
+ */
+public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> {
+
+ @Override
+ public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ return new CombineFileRecordReader<IntWritable, BytesWritable>((CombineFileSplit) inputSplit,
+ taskAttemptContext, WholeFileRecordReader.class);
+ }
+}
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Sun Jun 23 18:15:56 2013
@@ -17,17 +17,24 @@
package org.apache.mahout.text;
-import java.lang.reflect.Constructor;
+import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Map;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.apache.mahout.utils.io.ChunkedWriter;
@@ -42,38 +49,50 @@ import org.apache.mahout.utils.io.Chunke
public class SequenceFilesFromDirectory extends AbstractJob {
private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
-
+
private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
- private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass","filter"};
+ private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
private static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
private static final String[] CHARSET_OPTION = {"charset", "c"};
public static void main(String[] args) throws Exception {
ToolRunner.run(new SequenceFilesFromDirectory(), args);
}
-
+
/*
- * callback main after processing hadoop parameters
- */
+ * callback main after processing MapReduce parameters
+ */
@Override
public int run(String[] args) throws Exception {
- addOptions();
-
+ addOptions();
+ addOption(DefaultOptionCreator.methodOption().create());
+ addOption(DefaultOptionCreator.overwriteOption().create());
+
if (parseArguments(args) == null) {
return -1;
}
-
+
Map<String, String> options = parseOptions();
- Path input = getInputPath();
Path output = getOutputPath();
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
- Configuration conf = new Configuration();
- HadoopUtil.delete(conf, output);
+ HadoopUtil.delete(getConf(), output);
}
- String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+ if (getOption(DefaultOptionCreator.METHOD_OPTION,
+ DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+ runSequential(getConf(), getInputPath(), output, options);
+ } else {
+ runMapReduce(getInputPath(), output);
+ }
+
+ return 0;
+ }
+
+ private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options)
+ throws IOException, InterruptedException, NoSuchMethodException {
+ // Running sequentially
Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
- Configuration conf = getConf();
+ String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
FileSystem fs = FileSystem.get(input.toUri(), conf);
ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
@@ -83,16 +102,9 @@ public class SequenceFilesFromDirectory
if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
} else {
- Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass =
- Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
- Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
- pathFilterClass.getConstructor(Configuration.class,
- String.class,
- Map.class,
- ChunkedWriter.class,
- Charset.class,
- FileSystem.class);
- pathFilter = constructor.newInstance(conf, keyPrefix, options, writer, charset, fs);
+ pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
+ new Class[]{Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
+ new Object[]{conf, keyPrefix, options, writer, charset, fs});
}
fs.listStatus(input, pathFilter);
} finally {
@@ -101,25 +113,69 @@ public class SequenceFilesFromDirectory
return 0;
}
+ private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException {
+
+ int chunkSizeInMB = 64;
+ if (hasOption(CHUNK_SIZE_OPTION[0])) {
+ chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+ }
+
+ String keyPrefix = null;
+ if (hasOption(KEY_PREFIX_OPTION[0])) {
+ keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
+ }
+
+ // Prepare Job for submission.
+ Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
+ SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
+ SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
+
+ Configuration jobConfig = job.getConfiguration();
+ jobConfig.set("keyPrefix", keyPrefix);
+ FileSystem fs = FileSystem.get(jobConfig);
+ FileStatus fsFileStatus = fs.getFileStatus(input);
+ String inputDirList = buildDirList(fs, fsFileStatus);
+ jobConfig.set("baseinputpath", input.toString());
+
+ long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+
+ // set the max split locations, otherwise we get nasty debug stuff
+ jobConfig.set("mapreduce.job.max.split.locations", "1000000");
+
+ FileInputFormat.setInputPaths(job, inputDirList);
+ // need to set this to a multiple of the block size, or no split happens
+ FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+ FileOutputFormat.setCompressOutput(job, true);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
+ return 0;
+ }
+
/**
* Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job.
* Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available.
- * */
+ */
protected void addOptions() {
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption(DefaultOptionCreator.methodOption().create());
addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
- "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+ "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
- "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+ "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
}
/**
* Override this method in order to parse your additional options from the command line. Do not forget to call
* super() otherwise standard options (input/output dirs etc) will not be available.
+ *
+ * @return Map of options
*/
protected Map<String, String> parseOptions() {
Map<String, String> options = Maps.newHashMap();
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+
+import java.io.IOException;
+
+/**
+ * Map class for SequenceFilesFromDirectory MR job
+ */
+public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+ private String keyPrefix;
+ private Text fileValue = new Text();
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ this.keyPrefix = context.getConfiguration().get("keyPrefix", "");
+ }
+
+ public void map(IntWritable key, BytesWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ Configuration configuration = context.getConfiguration();
+ Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+ String relativeFilePath = SequenceFilesFromMailArchivesMapper.calcRelativeFilePath(configuration, filePath);
+
+ String filename = this.keyPrefix.length() > 0 ?
+ this.keyPrefix + Path.SEPARATOR + relativeFilePath :
+ Path.SEPARATOR + relativeFilePath;
+
+ fileValue.set(value.getBytes(), 0, value.getBytes().length);
+ context.write(new Text(filename), fileValue);
+ }
+}
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java Sun Jun 23 18:15:56 2013
@@ -20,17 +20,23 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import org.apache.commons.cli2.builder.ArgumentBuilder;
import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
@@ -48,21 +54,16 @@ import org.slf4j.LoggerFactory;
*/
public final class SequenceFilesFromMailArchives extends AbstractJob {
- private static final Logger log = LoggerFactory.getLogger(
- SequenceFilesFromMailArchives.class);
+ private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
public void createSequenceFiles(MailOptions options) throws IOException {
- ChunkedWriter writer = new ChunkedWriter(
- getConf(), options.getChunkSize(), new Path(options.getOutputDir()));
- MailProcessor processor = new MailProcessor(
- options, options.getPrefix(), writer);
+ ChunkedWriter writer = new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()));
+ MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
try {
if (options.getInput().isDirectory()) {
- PrefixAdditionFilter filter = new PrefixAdditionFilter(
- processor, writer);
+ PrefixAdditionFilter filter = new PrefixAdditionFilter(processor, writer);
options.getInput().listFiles(filter);
- log.info("Parsed {} messages from {}", filter.getMessageCount(),
- options.getInput().getAbsolutePath());
+ log.info("Parsed {} messages from {}", filter.getMessageCount(), options.getInput().getAbsolutePath());
} else {
long start = System.currentTimeMillis();
long cnt = processor.parseMboxLineByLine(options.getInput());
@@ -94,12 +95,11 @@ public final class SequenceFilesFromMail
if (current.isDirectory()) {
log.info("At {}", current.getAbsolutePath());
PrefixAdditionFilter nested = new PrefixAdditionFilter(
- new MailProcessor(processor.getOptions(), processor.getPrefix()
- + File.separator + current.getName(), writer), writer);
+ new MailProcessor(processor.getOptions(), processor.getPrefix()
+ + File.separator + current.getName(), writer), writer);
current.listFiles(nested);
long dirCount = nested.getMessageCount();
- log.info("Parsed {} messages from directory {}", dirCount,
- current.getAbsolutePath());
+ log.info("Parsed {} messages from directory {}", dirCount, current.getAbsolutePath());
messageCount += dirCount;
} else {
try {
@@ -118,69 +118,69 @@ public final class SequenceFilesFromMail
@Override
public int run(String[] args) throws Exception {
- DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
- ArgumentBuilder abuilder = new ArgumentBuilder();
- //GroupBuilder gbuilder = new GroupBuilder();
+ DefaultOptionBuilder optionBuilder = new DefaultOptionBuilder();
+ ArgumentBuilder argumentBuilder = new ArgumentBuilder();
addInputOption();
addOutputOption();
+ addOption(DefaultOptionCreator.methodOption().create());
- addOption(obuilder.withLongName("chunkSize").withArgument(
- abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create())
- .withDescription("The chunkSize in MegaBytes. Defaults to 64")
- .withShortName("chunk").create());
-
- addOption(obuilder.withLongName("keyPrefix").withArgument(
- abuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create())
- .withDescription("The prefix to be prepended to the key")
- .withShortName("prefix").create());
- addOption(obuilder.withLongName("charset")
- .withRequired(true).withArgument(abuilder.withName("charset")
- .withMinimum(1).withMaximum(1).create()).withDescription(
- "The name of the character encoding of the input files")
- .withShortName("c").create());
- addOption(obuilder.withLongName("subject")
- .withRequired(false).withDescription(
- "Include the Mail subject as part of the text. Default is false")
- .withShortName("s").create());
- addOption(obuilder.withLongName("to").withRequired(false)
- .withDescription("Include the to field in the text. Default is false")
- .withShortName("to").create());
- addOption(obuilder.withLongName("from").withRequired(false).withDescription(
- "Include the from field in the text. Default is false")
- .withShortName("from").create());
- addOption(obuilder.withLongName("references")
- .withRequired(false).withDescription(
- "Include the references field in the text. Default is false")
- .withShortName("refs").create());
- addOption(obuilder.withLongName("body").withRequired(false)
- .withDescription("Include the body in the output. Default is false")
- .withShortName("b").create());
- addOption(obuilder.withLongName("stripQuoted")
- .withRequired(false).withDescription(
- "Strip (remove) quoted email text in the body. Default is false")
- .withShortName("q").create());
+ addOption(optionBuilder.withLongName("chunkSize").withArgument(
+ argumentBuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create())
+ .withDescription("The chunkSize in MegaBytes. Defaults to 64")
+ .withShortName("chunk").create());
+
+ addOption(optionBuilder.withLongName("keyPrefix").withArgument(
+ argumentBuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create())
+ .withDescription("The prefix to be prepended to the key")
+ .withShortName("prefix").create());
+ addOption(optionBuilder.withLongName("charset")
+ .withRequired(true).withArgument(argumentBuilder.withName("charset")
+ .withMinimum(1).withMaximum(1).create()).withDescription(
+ "The name of the character encoding of the input files")
+ .withShortName("c").create());
+ addOption(optionBuilder.withLongName("subject")
+ .withRequired(false).withDescription(
+ "Include the Mail subject as part of the text. Default is false")
+ .withShortName("s").create());
+ addOption(optionBuilder.withLongName("to").withRequired(false)
+ .withDescription("Include the to field in the text. Default is false")
+ .withShortName("to").create());
+ addOption(optionBuilder.withLongName("from").withRequired(false).withDescription(
+ "Include the from field in the text. Default is false")
+ .withShortName("from").create());
+ addOption(optionBuilder.withLongName("references")
+ .withRequired(false).withDescription(
+ "Include the references field in the text. Default is false")
+ .withShortName("refs").create());
+ addOption(optionBuilder.withLongName("body").withRequired(false)
+ .withDescription("Include the body in the output. Default is false")
+ .withShortName("b").create());
+ addOption(optionBuilder.withLongName("stripQuoted")
+ .withRequired(false).withDescription(
+ "Strip (remove) quoted email text in the body. Default is false")
+ .withShortName("q").create());
addOption(
- obuilder.withLongName("quotedRegex")
- .withRequired(false).withArgument(abuilder.withName("regex")
- .withMinimum(1).withMaximum(1).create()).withDescription(
- "Specify the regex that identifies quoted text. "
- + "Default is to look for > or | at the beginning of the line.")
- .withShortName("q").create());
+ optionBuilder.withLongName("quotedRegex")
+ .withRequired(false).withArgument(argumentBuilder.withName("regex")
+ .withMinimum(1).withMaximum(1).create()).withDescription(
+ "Specify the regex that identifies quoted text. "
+ + "Default is to look for > or | at the beginning of the line.")
+ .withShortName("q").create());
addOption(
- obuilder.withLongName("separator")
- .withRequired(false).withArgument(abuilder.withName("separator")
- .withMinimum(1).withMaximum(1).create()).withDescription(
- "The separator to use between metadata items (to, from, etc.). Default is \\n")
- .withShortName("sep").create());
+ optionBuilder.withLongName("separator")
+ .withRequired(false).withArgument(argumentBuilder.withName("separator")
+ .withMinimum(1).withMaximum(1).create()).withDescription(
+ "The separator to use between metadata items (to, from, etc.). Default is \\n")
+ .withShortName("sep").create());
addOption(
- obuilder.withLongName("bodySeparator")
- .withRequired(false).withArgument(abuilder.withName("bodySeparator")
- .withMinimum(1).withMaximum(1).create()).withDescription(
- "The separator to use between lines in the body. Default is \\n. "
- + "Useful to change if you wish to have the message be on one line")
- .withShortName("bodySep").create());
+ optionBuilder.withLongName("bodySeparator")
+ .withRequired(false).withArgument(argumentBuilder.withName("bodySeparator")
+ .withMinimum(1).withMaximum(1).create()).withDescription(
+ "The separator to use between lines in the body. Default is \\n. "
+ + "Useful to change if you wish to have the message be on one line")
+ .withShortName("bodySep").create());
addOption(DefaultOptionCreator.helpOption());
Map<String, List<String>> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
@@ -209,10 +209,9 @@ public final class SequenceFilesFromMail
List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
// patternOrder is used downstream so that we can know what order the text
- // is in instead
- // of encoding it in the string, which
+ // is in instead of encoding it in the string, which
// would require more processing later to remove it pre feature selection.
- Map<String,Integer> patternOrder = new HashMap<String,Integer>();
+ Map<String, Integer> patternOrder = Maps.newHashMap();
int order = 0;
if (hasOption("from")) {
patterns.add(MailProcessor.FROM_PREFIX);
@@ -245,10 +244,101 @@ public final class SequenceFilesFromMail
if (hasOption("quotedRegex")) {
options.setQuotedTextPattern(Pattern.compile(getOption("quotedRegex")));
}
+
+ if (getOption(DefaultOptionCreator.METHOD_OPTION,
+ DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
+ runSequential(options);
+ } else {
+ runMapReduce(getInputPath(), getOutputPath());
+ }
+
+ return 0;
+ }
+
+ private int runSequential(MailOptions options)
+ throws IOException, InterruptedException, NoSuchMethodException {
+
long start = System.currentTimeMillis();
createSequenceFiles(options);
long finish = System.currentTimeMillis();
log.info("Conversion took {}ms", finish - start);
+
+ return 0;
+ }
+
+ private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
+
+ Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class,
+ Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives");
+
+ Configuration jobConfig = job.getConfiguration();
+
+ if (hasOption("keyPrefix")) {
+ jobConfig.set("prefix", getOption("keyPrefix"));
+ }
+
+ int chunkSize = 0;
+ if (hasOption("chunkSize")) {
+ chunkSize = Integer.parseInt(getOption("chunkSize"));
+ jobConfig.set("chunkSize", String.valueOf(chunkSize));
+ }
+
+ Charset charset;
+ if (hasOption("charset")) {
+ charset = Charset.forName(getOption("charset"));
+ jobConfig.set("charset", charset.displayName());
+ }
+
+ if (hasOption("from")) {
+ jobConfig.set("fromOpt", "true");
+ }
+
+ if (hasOption("to")) {
+ jobConfig.set("toOpt", "true");
+ }
+
+ if (hasOption("references")) {
+ jobConfig.set("refsOpt", "true");
+ }
+
+ if (hasOption("subject")) {
+ jobConfig.set("subjectOpt", "true");
+ }
+
+ if (hasOption("quotedRegex")) {
+ jobConfig.set("quotedRegex", Pattern.compile(getOption("quotedRegex")).toString());
+ }
+
+ if (hasOption("separatorOpt")) {
+ jobConfig.set("separatorOpt", getOption("separatorOpt"));
+ } else {
+ jobConfig.set("separatorOpt", "\n");
+ }
+
+ if (hasOption("body")) {
+ jobConfig.set("bodyOpt", "true");
+ } else {
+ jobConfig.set("bodyOpt", "false");
+ }
+
+ FileSystem fs = FileSystem.get(jobConfig);
+ FileStatus fsFileStatus = fs.getFileStatus(inputPath);
+
+ jobConfig.set("baseinputpath", inputPath.toString());
+ String inputDirList = buildDirList(fs, fsFileStatus);
+ FileInputFormat.setInputPaths(job, inputDirList);
+
+ long chunkSizeInBytes = chunkSize * 1024 * 1024;
+ // need to set this to a multiple of the block size, or no split happens
+ FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
+
+ // set the max split locations, otherwise we get nasty debug stuff
+ jobConfig.set("mapreduce.job.max.split.locations", "1000000");
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ return -1;
+ }
return 0;
}
}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.email.MailOptions;
+import org.apache.mahout.utils.email.MailProcessor;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+
+/**
+ *
+ * Map Class for the SequenceFilesFromMailArchives job
+ *
+ */
+public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+ private Text outKey = new Text();
+ private Text outValue = new Text();
+
+ private static final Pattern MESSAGE_START = Pattern.compile(
+ "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
+ "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
+
+ private MailOptions options;
+
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+ // absorb all of the options into the MailOptions object
+
+ this.options = new MailOptions();
+
+ options.setPrefix(conf.get("prefix", ""));
+
+ if (!conf.get("chunkSize", "").equals("")) {
+ options.setChunkSize(conf.getInt("chunkSize", 64));
+ }
+
+ if (!conf.get("charset", "").equals("")) {
+ Charset charset = Charset.forName(conf.get("charset", "UTF-8"));
+ options.setCharset(charset);
+ } else {
+ Charset charset = Charset.forName("UTF-8");
+ options.setCharset(charset);
+ }
+
+ List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
+ // patternOrder is used downstream so that we can know what order the
+ // text is in instead
+ // of encoding it in the string, which
+ // would require more processing later to remove it pre feature
+ // selection.
+ Map<String,Integer> patternOrder = Maps.newHashMap();
+ int order = 0;
+
+ if (!conf.get("fromOpt", "").equals("")) {
+ patterns.add(MailProcessor.FROM_PREFIX);
+ patternOrder.put(MailOptions.FROM, order++);
+ }
+
+ if (!conf.get("toOpt", "").equals("")) {
+ patterns.add(MailProcessor.TO_PREFIX);
+ patternOrder.put(MailOptions.TO, order++);
+ }
+
+ if (!conf.get("refsOpt", "").equals("")) {
+ patterns.add(MailProcessor.REFS_PREFIX);
+ patternOrder.put(MailOptions.REFS, order++);
+ }
+
+ if (!conf.get("subjectOpt", "").equals("")) {
+ patterns.add(MailProcessor.SUBJECT_PREFIX);
+ patternOrder.put(MailOptions.SUBJECT, order++);
+ }
+
+ options.setStripQuotedText(conf.getBoolean("quotedOpt", false));
+
+ options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
+ options.setPatternOrder(patternOrder);
+
+ options.setIncludeBody(conf.getBoolean("bodyOpt", false));
+
+ options.setSeparator("\n");
+ if (!conf.get("separatorOpt", "").equals("")) {
+ options.setSeparator(conf.get("separatorOpt", ""));
+ }
+ if (!conf.get("bodySeparatorOpt", "").equals("")) {
+ options.setBodySeparator(conf.get("bodySeparatorOpt", ""));
+ }
+ if (!conf.get("quotedRegexOpt", "").equals("")) {
+ options.setQuotedTextPattern(Pattern.compile(conf.get("quotedRegexOpt", "")));
+ }
+
+ }
+
+ public long parseMboxLineByLine(String filename, InputStream mboxInputStream, Context context)
+ throws IOException, InterruptedException {
+ long messageCount = 0;
+ try {
+ StringBuilder contents = new StringBuilder();
+ StringBuilder body = new StringBuilder();
+ Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
+ Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
+ String[] patternResults = new String[options.getPatternsToMatch().length];
+ Matcher[] matchers = new Matcher[options.getPatternsToMatch().length];
+ for (int i = 0; i < matchers.length; i++) {
+ matchers[i] = options.getPatternsToMatch()[i].matcher("");
+ }
+
+ String messageId = null;
+ boolean inBody = false;
+ Pattern quotedTextPattern = options.getQuotedTextPattern();
+
+ for (String nextLine : new FileLineIterable(mboxInputStream, options.getCharset(), false, filename)) {
+ if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
+ for (int i = 0; i < matchers.length; i++) {
+ Matcher matcher = matchers[i];
+ matcher.reset(nextLine);
+ if (matcher.matches()) {
+ patternResults[i] = matcher.group(1);
+ }
+ }
+
+ // only start appending body content after we've seen a message ID
+ if (messageId != null) {
+ // first, see if we hit the end of the message
+ messageBoundaryMatcher.reset(nextLine);
+ if (messageBoundaryMatcher.matches()) {
+ // done parsing this message ... write it out
+ String key = generateKey(filename, options.getPrefix(), messageId);
+ // if this ordering changes, then also change
+ // FromEmailToDictionaryMapper
+ writeContent(options.getSeparator(), contents, body, patternResults);
+
+ this.outKey.set(key);
+ this.outValue.set(contents.toString());
+ context.write(this.outKey, this.outValue);
+ contents.setLength(0); // reset the buffer
+ body.setLength(0);
+ messageId = null;
+ inBody = false;
+ } else {
+ if (inBody && options.isIncludeBody()) {
+ if (!nextLine.isEmpty()) {
+ body.append(nextLine).append(options.getBodySeparator());
+ }
+ } else {
+ // first empty line we see after reading the message Id
+ // indicates that we are in the body ...
+ inBody = nextLine.isEmpty();
+ }
+ }
+ } else {
+ if (nextLine.length() > 14) {
+ messageIdMatcher.reset(nextLine);
+ if (messageIdMatcher.matches()) {
+ messageId = messageIdMatcher.group(1);
+ ++messageCount;
+ }
+ }
+ }
+ }
+ }
+ // write the last message in the file if available
+ if (messageId != null) {
+ String key = generateKey(filename, options.getPrefix(), messageId);
+ writeContent(options.getSeparator(), contents, body, patternResults);
+
+ this.outKey.set(key);
+ this.outValue.set(contents.toString());
+ context.write(this.outKey, this.outValue);
+ contents.setLength(0); // reset the buffer
+ }
+ } catch (FileNotFoundException ignored) {
+
+ }
+ // TODO: report exceptions and continue;
+ return messageCount;
+ }
+
+ protected static String generateKey(String mboxFilename, String prefix, String messageId) {
+ return prefix + File.separator + mboxFilename + File.separator + messageId;
+ }
+
+ private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
+ for (String match : matches) {
+ if (match != null) {
+ contents.append(match).append(separator);
+ } else {
+ contents.append("").append(separator);
+ }
+ }
+ contents.append(body);
+ }
+
+ public static String calcRelativeFilePath(Configuration conf, Path filePath) throws IOException {
+ FileSystem fs = filePath.getFileSystem(conf);
+ FileStatus fst = fs.getFileStatus(filePath);
+ String currentPath = fst.getPath().toString().replaceFirst("file:", "");
+
+ String basePath = conf.get("baseinputpath");
+ if (!basePath.endsWith("/")) {
+ basePath += "/";
+ }
+ basePath = basePath.replaceFirst("file:", "");
+ String[] parts = currentPath.split(basePath);
+
+ String hdfsStuffRemoved = currentPath; // default value
+ if (parts.length == 2) {
+ hdfsStuffRemoved = parts[1];
+ } else if (parts.length == 1) {
+ hdfsStuffRemoved = parts[0];
+ }
+ return hdfsStuffRemoved;
+ }
+
+ public void map(IntWritable key, BytesWritable value, Context context)
+ throws IOException, InterruptedException {
+ Configuration configuration = context.getConfiguration();
+ Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+ String relativeFilePath = calcRelativeFilePath(configuration, filePath);
+ ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
+ parseMboxLineByLine(relativeFilePath, is, context);
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java?rev=1495863&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java Sun Jun 23 18:15:56 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * RecordReader used with the MultipleTextFileInputFormat class to read full files as
+ * k/v pairs and groups of files as single input splits.
+ */
+public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> {
+
+ private FileSplit fileSplit;
+ private boolean processed = false;
+ private Configuration configuration;
+ private BytesWritable value = new BytesWritable();
+ private IntWritable index;
+
+ public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
+ throws IOException {
+ this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
+ fileSplit.getLength(idx), fileSplit.getLocations());
+ this.configuration = taskAttemptContext.getConfiguration();
+ this.index = new IntWritable(idx);
+ }
+
+ @Override
+ public IntWritable getCurrentKey() {
+ return index;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return processed ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (!processed) {
+ byte[] contents = new byte[(int) fileSplit.getLength()];
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(this.configuration);
+ FSDataInputStream in = null;
+ try {
+ if (!fs.isFile(file)) {
+ return false;
+ }
+ in = fs.open(file);
+ IOUtils.readFully(in, contents, 0, contents.length);
+ value.setCapacity(contents.length);
+ value.set(contents, 0, contents.length);
+ } finally {
+ Closeables.close(in, false);
+ }
+ processed = true;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
\ No newline at end of file
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java Sun Jun 23 18:15:56 2013
@@ -22,9 +22,10 @@ import java.util.zip.GZIPOutputStream;
import com.google.common.io.Closeables;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
-
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
import org.apache.mahout.utils.MahoutTestCase;
@@ -36,11 +37,10 @@ import org.junit.Test;
* Test case for the SequenceFilesFromMailArchives command-line application.
*/
public final class SequenceFilesFromMailArchivesTest extends MahoutTestCase {
-
+
// TODO: Negative tests
- private File inputDir = null;
- private File outputDir = null;
+ private File inputDir;
/**
* Create the input and output directories needed for testing
@@ -51,8 +51,7 @@ public final class SequenceFilesFromMail
public void setUp() throws Exception {
super.setUp();
inputDir = getTestTempDir("mail-archives-in");
- outputDir = getTestTempDir("mail-archives-out");
-
+
// write test mail messages to a gzipped file in a nested directory
File subDir = new File(inputDir, "subdir");
subDir.mkdir();
@@ -64,58 +63,130 @@ public final class SequenceFilesFromMail
gzOut.finish();
} finally {
Closeables.close(gzOut, false);
+ }
+
+ File subDir2 = new File(subDir, "subsubdir");
+ subDir2.mkdir();
+ File gzFile2 = new File(subDir2, "mail-messages-2.gz");
+ try {
+ gzOut = new GZIPOutputStream(new FileOutputStream(gzFile2));
+ gzOut.write(testMailMessages.getBytes("UTF-8"));
+ gzOut.finish();
+ } finally {
+ Closeables.close(gzOut, false);
}
}
- /**
- * Test the main method of the SequenceFilesFromMailArchives
- * command-line application.
- */
@Test
- public void testMain() throws Exception {
+ public void testSequential() throws Exception {
+
+ File outputDir = this.getTestTempDir("mail-archives-out");
+
String[] args = {
- "--input", inputDir.getAbsolutePath(),
+ "--input", inputDir.getAbsolutePath(),
"--output", outputDir.getAbsolutePath(),
"--charset", "UTF-8",
"--keyPrefix", "TEST",
- "--body", "--subject", "--separator", ""
+ "--method", "sequential",
+ "--body", "--subject", "--separator", ""
};
-
+
// run the application's main method
SequenceFilesFromMailArchives.main(args);
-
- // app should create a single SequenceFile named "chunk-0"
- // in the output dir
+
+ // app should create a single SequenceFile named "chunk-0" in the output dir
File expectedChunkFile = new File(outputDir, "chunk-0");
String expectedChunkPath = expectedChunkFile.getAbsolutePath();
- Assert.assertTrue("Expected chunk file "+expectedChunkPath+" not found!", expectedChunkFile.isFile());
-
+ Assert.assertTrue("Expected chunk file " + expectedChunkPath + " not found!", expectedChunkFile.isFile());
Configuration conf = new Configuration();
- SequenceFileIterator<Text,Text> iterator =
- new SequenceFileIterator<Text,Text>(new Path(expectedChunkPath), true, conf);
-
+ SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(new Path(expectedChunkPath), true, conf);
Assert.assertTrue("First key/value pair not found!", iterator.hasNext());
- Pair<Text,Text> record = iterator.next();
+ Pair<Text, Text> record = iterator.next();
File parentFile = new File(new File(new File("TEST"), "subdir"), "mail-messages.gz");
Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), record.getFirst().toString());
- Assert.assertEquals(testVars[0][1]+testVars[0][2], record.getSecond().toString());
+ Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
record = iterator.next();
Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), record.getFirst().toString());
- Assert.assertEquals(testVars[1][1]+testVars[1][2], record.getSecond().toString());
+ Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
+
+ record = iterator.next();
+ File parentFileSubSubDir = new File(new File(new File(new File("TEST"), "subdir"), "subsubdir"), "mail-messages-2.gz");
+ Assert.assertEquals(new File(parentFileSubSubDir, testVars[0][0]).toString(), record.getFirst().toString());
+ Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
+
+ Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+ record = iterator.next();
+ Assert.assertEquals(new File(parentFileSubSubDir, testVars[1][0]).toString(), record.getFirst().toString());
+ Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
Assert.assertFalse("Only two key/value pairs expected!", iterator.hasNext());
}
-
- // Messages extracted and anonymized from the ASF mail archives
+ @Test
+ public void testMapReduce() throws Exception {
+
+ Path tmpDir = this.getTestTempDirPath();
+ Path mrOutputDir = new Path(tmpDir, "mail-archives-out-mr");
+ Configuration configuration = new Configuration();
+ FileSystem fs = FileSystem.get(configuration);
+
+ File expectedInputFile = new File(inputDir.toString());
+
+ String[] args = {
+ "--input", expectedInputFile.getAbsolutePath(),
+ "--output", mrOutputDir.toString(),
+ "--charset", "UTF-8",
+ "--keyPrefix", "TEST",
+ "--method", "mapreduce",
+ "--body", "--subject", "--separator", ""
+ };
+
+ // run the application's main method
+ SequenceFilesFromMailArchives.main(args);
+
+ // app should create a single SequenceFile named "chunk-0" in the output dir
+ FileStatus[] fileStatuses = fs.listStatus(mrOutputDir.suffix("/part-m-00000"));
+ assertEquals(1, fileStatuses.length); // only one
+ assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+ SequenceFileIterator<Text, Text> iterator =
+ new SequenceFileIterator<Text, Text>(mrOutputDir.suffix("/part-m-00000"), true, configuration);
+
+ Assert.assertTrue("First key/value pair not found!", iterator.hasNext());
+ Pair<Text, Text> record = iterator.next();
+
+ File parentFileSubSubDir = new File(new File(new File(new File("TEST"), "subdir"), "subsubdir"), "mail-messages-2.gz");
+
+ Assert.assertEquals(new File(parentFileSubSubDir, testVars[0][0]).toString(), record.getFirst().toString());
+ Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
+ Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+ record = iterator.next();
+ Assert.assertEquals(new File(parentFileSubSubDir, testVars[1][0]).toString(), record.getFirst().toString());
+ Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
+
+ // test other file
+ File parentFile = new File(new File(new File("TEST"), "subdir"), "mail-messages.gz");
+ record = iterator.next();
+ Assert.assertEquals(new File(parentFile, testVars[0][0]).toString(), record.getFirst().toString());
+ Assert.assertEquals(testVars[0][1] + testVars[0][2], record.getSecond().toString());
+ Assert.assertTrue("Second key/value pair not found!", iterator.hasNext());
+
+ record = iterator.next();
+ Assert.assertEquals(new File(parentFile, testVars[1][0]).toString(), record.getFirst().toString());
+ Assert.assertEquals(testVars[1][1] + testVars[1][2], record.getSecond().toString());
+ Assert.assertFalse("Only four key/value pairs expected!", iterator.hasNext());
+ }
+
+ // Messages extracted and made anonymous from the ASF mail archives
private static final String[][] testVars = {
new String[] {
"user@example.com",
- "Ant task for JDK1.1 collections build option",
+ "Ant task for JDK1.1 collections build option",
"\nThis is just a test message\n--\nTesty McTester\n"
},
new String[] {
@@ -124,38 +195,38 @@ public final class SequenceFilesFromMail
"\nHi all,\nThis is another test message.\nRegards,\nAnother Test\n"
}
};
-
+
private static final String testMailMessages =
"From user@example.com Mon Jul 24 19:13:53 2000\n"
- + "Return-Path: <us...@example.com>\n"
- + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
- + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
- + "Received: (qmail 49267 invoked from network); 24 Jul 2000 19:13:53 -0000\n"
- + "Message-ID: <"+testVars[0][0]+">\n"
- + "From: \"Testy McTester\" <us...@example.com>\n"
- + "To: <an...@jakarta.apache.org>\n"
- + "Subject: "+testVars[0][1]+ '\n'
- + "Date: Mon, 24 Jul 2000 12:24:56 -0700\n"
- + "MIME-Version: 1.0\n"
- + "Content-Type: text/plain;\n"
- + " charset=\"Windows-1252\"\n"
- + "Content-Transfer-Encoding: 7bit\n"
- + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
- + testVars[0][2]+'\n'
- + "From somebody@example.com Wed Jul 26 11:32:16 2000\n"
- + "Return-Path: <so...@example.com>\n"
- + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
- + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
- + "Received: (qmail 73966 invoked from network); 26 Jul 2000 11:32:16 -0000\n"
- + "User-Agent: Microsoft-Outlook-Express-Macintosh-Edition/5.02.2022\n"
- + "Date: Wed, 26 Jul 2000 13:32:08 +0200\n"
- + "Subject: "+testVars[1][1]+ '\n'
- + "From: Another Test <so...@example.com>\n"
- + "To: <an...@jakarta.apache.org>\n"
- + "Message-Id: <"+testVars[1][0]+">\n"
- + "Mime-Version: 1.0\n"
- + "Content-Type: text/plain; charset=\"US-ASCII\"\n"
- + "Content-Transfer-Encoding: 7bit\n"
- + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
- + testVars[1][2];
+ + "Return-Path: <us...@example.com>\n"
+ + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
+ + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
+ + "Received: (qmail 49267 invoked from network); 24 Jul 2000 19:13:53 -0000\n"
+ + "Message-ID: <" + testVars[0][0] + ">\n"
+ + "From: \"Testy McTester\" <us...@example.com>\n"
+ + "To: <an...@jakarta.apache.org>\n"
+ + "Subject: " + testVars[0][1] + '\n'
+ + "Date: Mon, 24 Jul 2000 12:24:56 -0700\n"
+ + "MIME-Version: 1.0\n"
+ + "Content-Type: text/plain;\n"
+ + " charset=\"Windows-1252\"\n"
+ + "Content-Transfer-Encoding: 7bit\n"
+ + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
+ + testVars[0][2] + '\n'
+ + "From somebody@example.com Wed Jul 26 11:32:16 2000\n"
+ + "Return-Path: <so...@example.com>\n"
+ + "Mailing-List: contact ant-user-help@jakarta.apache.org; run by ezmlm\n"
+ + "Delivered-To: mailing list ant-user@jakarta.apache.org\n"
+ + "Received: (qmail 73966 invoked from network); 26 Jul 2000 11:32:16 -0000\n"
+ + "User-Agent: Microsoft-Outlook-Express-Macintosh-Edition/5.02.2022\n"
+ + "Date: Wed, 26 Jul 2000 13:32:08 +0200\n"
+ + "Subject: " + testVars[1][1] + '\n'
+ + "From: Another Test <so...@example.com>\n"
+ + "To: <an...@jakarta.apache.org>\n"
+ + "Message-Id: <" + testVars[1][0] + ">\n"
+ + "Mime-Version: 1.0\n"
+ + "Content-Type: text/plain; charset=\"US-ASCII\"\n"
+ + "Content-Transfer-Encoding: 7bit\n"
+ + "X-Spam-Rating: locus.apache.org 1.6.2 0/1000/N\n"
+ + testVars[1][2];
}
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1495863&r1=1495862&r2=1495863&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Sun Jun 23 18:15:56 2013
@@ -17,6 +17,7 @@
package org.apache.mahout.text;
+import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Map;
@@ -30,53 +31,160 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Text;
+import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
import org.apache.mahout.utils.MahoutTestCase;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public final class TestSequenceFilesFromDirectory extends MahoutTestCase {
+ private static final Logger logger = LoggerFactory.getLogger(TestSequenceFilesFromDirectory.class);
+
private static final String[][] DATA1 = {
- {"test1", "This is the first text."},
- {"test2", "This is the second text."},
- {"test3", "This is the third text."}
+ {"test1", "This is the first text."},
+ {"test2", "This is the second text."},
+ {"test3", "This is the third text."}
+ };
+
+ private static final String[][] DATA2 = {
+ {"recursive_test1", "This is the first text."},
+ {"recursive_test2", "This is the second text."},
+ {"recursive_test3", "This is the third text."}
};
- /**
- * Story converting text files to SequenceFile
- */
@Test
public void testSequenceFileFromDirectoryBasic() throws Exception {
// parameters
Configuration conf = new Configuration();
-
+
FileSystem fs = FileSystem.get(conf);
-
+
// create
Path tmpDir = this.getTestTempDirPath();
Path inputDir = new Path(tmpDir, "inputDir");
fs.mkdirs(inputDir);
+
Path outputDir = new Path(tmpDir, "outputDir");
-
+ Path outputDirRecursive = new Path(tmpDir, "outputDirRecursive");
+
+ Path inputDirRecursive = new Path(tmpDir, "inputDirRecur");
+ fs.mkdirs(inputDirRecursive);
+
// prepare input files
createFilesFromArrays(conf, inputDir, DATA1);
- SequenceFilesFromDirectory.main(new String[] {
- "--input", inputDir.toString(),
- "--output", outputDir.toString(),
- "--chunkSize", "64",
- "--charset", Charsets.UTF_8.name(),
- "--keyPrefix", "UID"});
-
+ SequenceFilesFromDirectory.main(new String[]{
+ "--input", inputDir.toString(),
+ "--output", outputDir.toString(),
+ "--chunkSize", "64",
+ "--charset", Charsets.UTF_8.name(),
+ "--keyPrefix", "UID",
+ "--method", "sequential"});
+
// check output chunk files
checkChunkFiles(conf, outputDir, DATA1, "UID");
+
+ createRecursiveDirFilesFromArrays(conf, inputDirRecursive, DATA2);
+
+ FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive);
+ String dirs = AbstractJob.buildDirList(fs, fstInputPath);
+
+ System.out.println("\n\n ----- recursive dirs: " + dirs);
+ SequenceFilesFromDirectory.main(new String[]{
+ "--input", inputDirRecursive.toString(),
+ "--output", outputDirRecursive.toString(),
+ "--chunkSize", "64",
+ "--charset", Charsets.UTF_8.name(),
+ "--keyPrefix", "UID",
+ "--method", "sequential"});
+
+ checkRecursiveChunkFiles(conf, outputDirRecursive, DATA2, "UID");
}
+ @Test
+ public void testSequenceFileFromDirectoryMapReduce() throws Exception {
+
+ Configuration conf = new Configuration();
+
+ FileSystem fs = FileSystem.get(conf);
+
+ // create
+ Path tmpDir = this.getTestTempDirPath();
+ Path inputDir = new Path(tmpDir, "inputDir");
+ fs.mkdirs(inputDir);
+
+ Path inputDirRecur = new Path(tmpDir, "inputDirRecur");
+ fs.mkdirs(inputDirRecur);
+
+ Path mrOutputDir = new Path(tmpDir, "mrOutputDir");
+ Path mrOutputDirRecur = new Path(tmpDir, "mrOutputDirRecur");
+
+ createFilesFromArrays(conf, inputDir, DATA1);
+
+ SequenceFilesFromDirectory.main(new String[]{
+ "--input", inputDir.toString(),
+ "--output", mrOutputDir.toString(),
+ "--chunkSize", "64",
+ "--charset", Charsets.UTF_8.name(),
+ "--method", "mapreduce",
+ "--keyPrefix", "UID"});
+
+ checkMRResultFiles(conf, mrOutputDir, DATA1, "UID");
+
+ createRecursiveDirFilesFromArrays(conf, inputDirRecur, DATA2);
+
+ FileStatus fst_input_path = fs.getFileStatus(inputDirRecur);
+ String dirs = AbstractJob.buildDirList(fs, fst_input_path);
+
+ logger.info("\n\n ---- recursive dirs: {}", dirs);
+
+ SequenceFilesFromDirectory.main(new String[]{
+ "--input", inputDirRecur.toString(),
+ "--output", mrOutputDirRecur.toString(),
+ "--chunkSize", "64",
+ "--charset", Charsets.UTF_8.name(),
+ "--method", "mapreduce",
+ "--keyPrefix", "UID"});
+
+ checkMRResultFilesRecursive(conf, mrOutputDirRecur, DATA2, "UID");
+ }
+
+
private static void createFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
FileSystem fs = FileSystem.get(conf);
+ OutputStreamWriter writer;
for (String[] aData : data) {
- OutputStreamWriter writer = new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), Charsets.UTF_8);
+ writer = new OutputStreamWriter(fs.create(new Path(inputDir, aData[0])), Charsets.UTF_8);
+ try {
+ writer.write(aData[1]);
+ } finally {
+ Closeables.close(writer, false);
+ }
+ }
+ }
+
+ private static void createRecursiveDirFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ logger.info("creativeRecursiveDirFilesFromArrays > based on: {}", inputDir.toString());
+ Path curPath;
+ String currentRecursiveDir = inputDir.toString();
+
+ for (String[] aData : data) {
+ OutputStreamWriter writer;
+
+ currentRecursiveDir += "/" + aData[0];
+ File subDir = new File(currentRecursiveDir);
+ subDir.mkdir();
+
+ curPath = new Path(subDir.toString(), "file.txt");
+ writer = new OutputStreamWriter(fs.create(curPath), Charsets.UTF_8);
+
+ logger.info("Created file: {}", curPath.toString());
+
try {
writer.write(aData[1]);
} finally {
@@ -90,24 +198,22 @@ public final class TestSequenceFilesFrom
String[][] data,
String prefix) throws IOException {
FileSystem fs = FileSystem.get(conf);
-
+
// output exists?
- FileStatus[] fstats = fs.listStatus(outputDir, new ExcludeDotFiles());
- assertEquals(1, fstats.length); // only one
- assertEquals("chunk-0", fstats[0].getPath().getName());
-
+ FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+ assertEquals(1, fileStatuses.length); // only one
+ assertEquals("chunk-0", fileStatuses[0].getPath().getName());
- Map<String,String> fileToData = Maps.newHashMap();
+ Map<String, String> fileToData = Maps.newHashMap();
for (String[] aData : data) {
fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
}
// read a chunk to check content
- SequenceFileIterator<Text,Text> iterator = new SequenceFileIterator<Text,Text>(fstats[0].getPath(), true, conf);
+ SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, conf);
try {
- for (String[] datum : data) {
- assertTrue(iterator.hasNext());
- Pair<Text,Text> record = iterator.next();
+ while (iterator.hasNext()) {
+ Pair<Text, Text> record = iterator.next();
String retrievedData = fileToData.get(record.getFirst().toString().trim());
assertNotNull(retrievedData);
assertEquals(retrievedData, record.getSecond().toString().trim());
@@ -116,16 +222,116 @@ public final class TestSequenceFilesFrom
Closeables.close(iterator, true);
}
}
-
+
/**
* exclude hidden (starting with dot) files
*/
private static class ExcludeDotFiles implements PathFilter {
@Override
public boolean accept(Path file) {
- return !file.getName().startsWith(".");
+ return !file.getName().startsWith(".") && !file.getName().startsWith("_");
}
}
+ private static void checkRecursiveChunkFiles(Configuration conf,
+ Path outputDir,
+ String[][] data,
+ String prefix) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ System.out.println(" ----------- check_Recursive_ChunkFiles ------------");
+
+ // output exists?
+ FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
+ assertEquals(1, fileStatuses.length); // only one
+ assertEquals("chunk-0", fileStatuses[0].getPath().getName());
+
+
+ Map<String, String> fileToData = Maps.newHashMap();
+ String currentPath = prefix;
+ for (String[] aData : data) {
+ currentPath += Path.SEPARATOR + aData[0];
+ fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]);
+ }
+
+ // read a chunk to check content
+ SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, conf);
+ try {
+ while (iterator.hasNext()) {
+ Pair<Text, Text> record = iterator.next();
+ String retrievedData = fileToData.get(record.getFirst().toString().trim());
+ System.out.printf("%s >> %s\n", record.getFirst().toString().trim(), record.getSecond().toString().trim());
+
+ assertNotNull(retrievedData);
+ assertEquals(retrievedData, record.getSecond().toString().trim());
+ System.out.printf(">>> k: %s, v: %s\n", record.getFirst().toString(), record.getSecond().toString());
+ }
+ } finally {
+ Closeables.close(iterator, true);
+ }
+ }
+
+ private static void checkMRResultFiles(Configuration conf, Path outputDir,
+ String[][] data, String prefix) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ // output exists?
+ FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+ assertEquals(1, fileStatuses.length); // only one
+ assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+ Map<String, String> fileToData = Maps.newHashMap();
+ for (String[] aData : data) {
+ System.out.printf("map.put: %s %s\n", prefix + Path.SEPARATOR + aData[0], aData[1]);
+ fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+ }
+
+ // read a chunk to check content
+ SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(
+ fileStatuses[0].getPath(), true, conf);
+ try {
+ while (iterator.hasNext()) {
+ Pair<Text, Text> record = iterator.next();
+ String retrievedData = fileToData.get(record.getFirst().toString().trim());
+
+ System.out.printf("MR> %s >> %s\n", record.getFirst().toString().trim(), record.getSecond().toString().trim());
+ assertNotNull(retrievedData);
+ assertEquals(retrievedData, record.getSecond().toString().trim());
+ }
+ } finally {
+ Closeables.close(iterator, true);
+ }
+ }
+
+ private static void checkMRResultFilesRecursive(Configuration conf, Path outputDir,
+ String[][] data, String prefix) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ // output exists?
+ FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
+ assertEquals(1, fileStatuses.length); // only one
+ assertEquals("part-m-00000", fileStatuses[0].getPath().getName());
+ Map<String, String> fileToData = Maps.newHashMap();
+ String currentPath = prefix;
+
+ for (String[] aData : data) {
+ currentPath += Path.SEPARATOR + aData[0];
+ fileToData.put(currentPath + Path.SEPARATOR + "file.txt", aData[1]);
+ }
+
+ // read a chunk to check content
+ SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(
+ fileStatuses[0].getPath(), true, conf);
+ try {
+ while (iterator.hasNext()) {
+ Pair<Text, Text> record = iterator.next();
+ System.out.printf("MR-Recur > Trying to check: %s\n", record.getFirst().toString().trim());
+ String retrievedData = fileToData.get(record.getFirst().toString().trim());
+ assertNotNull(retrievedData);
+ assertEquals(retrievedData, record.getSecond().toString().trim());
+ }
+ } finally {
+ Closeables.close(iterator, true);
+ }
+ }
}