You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2013/06/26 15:34:46 UTC
svn commit: r1496927 - in /mahout/trunk/integration/src:
main/java/org/apache/mahout/text/ test/java/org/apache/mahout/text/
Author: smarthi
Date: Wed Jun 26 13:34:45 2013
New Revision: 1496927
URL: http://svn.apache.org/r1496927
Log:
MAHOUT-833: Make conversion to sequence files map-reduce - (changes based on feedback from review).
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1496927&r1=1496926&r2=1496927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java Wed Jun 26 13:34:45 2013
@@ -52,9 +52,13 @@ public class SequenceFilesFromDirectory
private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
private static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
- private static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
private static final String[] CHARSET_OPTION = {"charset", "c"};
+ private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
+
+ public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+ public static final String BASE_INPUT_PATH = "baseinputpath";
+
public static void main(String[] args) throws Exception {
ToolRunner.run(new SequenceFilesFromDirectory(), args);
}
@@ -131,16 +135,16 @@ public class SequenceFilesFromDirectory
SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
Configuration jobConfig = job.getConfiguration();
- jobConfig.set("keyPrefix", keyPrefix);
+ jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
FileSystem fs = FileSystem.get(jobConfig);
FileStatus fsFileStatus = fs.getFileStatus(input);
String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
- jobConfig.set("baseinputpath", input.toString());
+ jobConfig.set(BASE_INPUT_PATH, input.toString());
long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
// set the max split locations, otherwise we get nasty debug stuff
- jobConfig.set("mapreduce.job.max.split.locations", "1000000");
+ jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
FileInputFormat.setInputPaths(job, inputDirList);
// need to set this to a multiple of the block size, or no split happens
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java?rev=1496927&r1=1496926&r2=1496927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java Wed Jun 26 13:34:45 2013
@@ -28,6 +28,8 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.mahout.common.HadoopUtil;
+import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION;
+
/**
* Map class for SequenceFilesFromDirectory MR job
*/
@@ -39,7 +41,7 @@ public class SequenceFilesFromDirectoryM
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
- this.keyPrefix = context.getConfiguration().get("keyPrefix", "");
+ this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], "");
}
public void map(IntWritable key, BytesWritable value, Context context)
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1496927&r1=1496926&r2=1496927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java Wed Jun 26 13:34:45 2013
@@ -16,19 +16,9 @@
*/
package org.apache.mahout.text;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -47,6 +37,14 @@ import org.apache.mahout.utils.io.Chunke
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
/**
* Converts a directory of gzipped mail archives into SequenceFiles of specified
* chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except
@@ -57,6 +55,22 @@ public final class SequenceFilesFromMail
private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
+ public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
+ public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
+ public static final String[] CHARSET_OPTION = {"charset", "c"};
+ public static final String[] SUBJECT_OPTION = {"subject", "s"};
+ public static final String[] TO_OPTION = {"to", "to"};
+ public static final String[] FROM_OPTION = {"from", "from"};
+ public static final String[] REFERENCES_OPTION = {"references", "refs"};
+ public static final String[] BODY_OPTION = {"body", "b"};
+ public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"};
+ public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"};
+ public static final String[] SEPARATOR_OPTION = {"separator", "sep"};
+ public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"};
+ public static final String BASE_INPUT_PATH = "baseinputpath";
+
+ private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
+
public void createSequenceFiles(MailOptions options) throws IOException {
ChunkedWriter writer = new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()));
MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
@@ -119,69 +133,31 @@ public final class SequenceFilesFromMail
@Override
public int run(String[] args) throws Exception {
- DefaultOptionBuilder optionBuilder = new DefaultOptionBuilder();
- ArgumentBuilder argumentBuilder = new ArgumentBuilder();
-
addInputOption();
addOutputOption();
addOption(DefaultOptionCreator.methodOption().create());
- addOption(optionBuilder.withLongName("chunkSize").withArgument(
- argumentBuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create())
- .withDescription("The chunkSize in MegaBytes. Defaults to 64")
- .withShortName("chunk").create());
-
- addOption(optionBuilder.withLongName("keyPrefix").withArgument(
- argumentBuilder.withName("keyPrefix").withMinimum(1).withMaximum(1).create())
- .withDescription("The prefix to be prepended to the key")
- .withShortName("prefix").create());
- addOption(optionBuilder.withLongName("charset")
- .withRequired(true).withArgument(argumentBuilder.withName("charset")
- .withMinimum(1).withMaximum(1).create()).withDescription(
- "The name of the character encoding of the input files")
- .withShortName("c").create());
- addOption(optionBuilder.withLongName("subject")
- .withRequired(false).withDescription(
- "Include the Mail subject as part of the text. Default is false")
- .withShortName("s").create());
- addOption(optionBuilder.withLongName("to").withRequired(false)
- .withDescription("Include the to field in the text. Default is false")
- .withShortName("to").create());
- addOption(optionBuilder.withLongName("from").withRequired(false).withDescription(
- "Include the from field in the text. Default is false")
- .withShortName("from").create());
- addOption(optionBuilder.withLongName("references")
- .withRequired(false).withDescription(
- "Include the references field in the text. Default is false")
- .withShortName("refs").create());
- addOption(optionBuilder.withLongName("body").withRequired(false)
- .withDescription("Include the body in the output. Default is false")
- .withShortName("b").create());
- addOption(optionBuilder.withLongName("stripQuoted")
- .withRequired(false).withDescription(
- "Strip (remove) quoted email text in the body. Default is false")
- .withShortName("q").create());
- addOption(
- optionBuilder.withLongName("quotedRegex")
- .withRequired(false).withArgument(argumentBuilder.withName("regex")
- .withMinimum(1).withMaximum(1).create()).withDescription(
+ addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
+ addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
+ addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+ "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
+ addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false");
+ addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false");
+ addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false");
+ addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1],
+ "Include the references field in the text. Default is false");
+ addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false");
+ addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1],
+ "Strip (remove) quoted email text in the body. Default is false");
+ addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1],
"Specify the regex that identifies quoted text. "
- + "Default is to look for > or | at the beginning of the line.")
- .withShortName("q").create());
- addOption(
- optionBuilder.withLongName("separator")
- .withRequired(false).withArgument(argumentBuilder.withName("separator")
- .withMinimum(1).withMaximum(1).create()).withDescription(
- "The separator to use between metadata items (to, from, etc.). Default is \\n")
- .withShortName("sep").create());
-
- addOption(
- optionBuilder.withLongName("bodySeparator")
- .withRequired(false).withArgument(argumentBuilder.withName("bodySeparator")
- .withMinimum(1).withMaximum(1).create()).withDescription(
+ + "Default is to look for > or | at the beginning of the line.");
+ addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1],
+ "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n");
+ addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1],
"The separator to use between lines in the body. Default is \\n. "
- + "Useful to change if you wish to have the message be on one line")
- .withShortName("bodySep").create());
+ + "Useful to change if you wish to have the message be on one line", "\n");
+
addOption(DefaultOptionCreator.helpOption());
Map<String, List<String>> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
@@ -191,16 +167,16 @@ public final class SequenceFilesFromMail
String outputDir = getOutputPath().toString();
int chunkSize = 64;
- if (hasOption("chunkSize")) {
- chunkSize = Integer.parseInt(getOption("chunkSize"));
+ if (hasOption(CHUNK_SIZE_OPTION[0])) {
+ chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
}
String prefix = "";
- if (hasOption("keyPrefix")) {
- prefix = getOption("keyPrefix");
+ if (hasOption(KEY_PREFIX_OPTION[0])) {
+ prefix = getOption(KEY_PREFIX_OPTION[0]);
}
- Charset charset = Charset.forName(getOption("charset"));
+ Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
MailOptions options = new MailOptions();
options.setInput(input);
options.setOutputDir(outputDir);
@@ -214,36 +190,40 @@ public final class SequenceFilesFromMail
// would require more processing later to remove it pre feature selection.
Map<String, Integer> patternOrder = Maps.newHashMap();
int order = 0;
- if (hasOption("from")) {
+ if (hasOption(FROM_OPTION[0])) {
patterns.add(MailProcessor.FROM_PREFIX);
patternOrder.put(MailOptions.FROM, order++);
}
- if (hasOption("to")) {
+ if (hasOption(TO_OPTION[0])) {
patterns.add(MailProcessor.TO_PREFIX);
patternOrder.put(MailOptions.TO, order++);
}
- if (hasOption("references")) {
+ if (hasOption(REFERENCES_OPTION[0])) {
patterns.add(MailProcessor.REFS_PREFIX);
patternOrder.put(MailOptions.REFS, order++);
}
- if (hasOption("subject")) {
+ if (hasOption(SUBJECT_OPTION[0])) {
patterns.add(MailProcessor.SUBJECT_PREFIX);
- patternOrder.put(MailOptions.SUBJECT, order++);
+ patternOrder.put(MailOptions.SUBJECT, order += 1);
}
- options.setStripQuotedText(hasOption("stripQuoted"));
+ options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0]));
options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
options.setPatternOrder(patternOrder);
- options.setIncludeBody(hasOption("body"));
- options.setSeparator("\n");
- if (hasOption("separator")) {
- options.setSeparator(getOption("separator"));
+ options.setIncludeBody(hasOption(BODY_OPTION[0]));
+
+ if (hasOption(SEPARATOR_OPTION[0])) {
+ options.setSeparator(getOption(SEPARATOR_OPTION[0]));
+ } else {
+ options.setSeparator("\n");
}
- if (hasOption("bodySeparator")) {
- options.setBodySeparator(getOption("bodySeparator"));
+
+ if (hasOption(BODY_SEPARATOR_OPTION[0])) {
+ options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0]));
}
- if (hasOption("quotedRegex")) {
- options.setQuotedTextPattern(Pattern.compile(getOption("quotedRegex")));
+
+ if (hasOption(QUOTED_REGEX_OPTION[0])) {
+ options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])));
}
if (getOption(DefaultOptionCreator.METHOD_OPTION,
@@ -274,58 +254,64 @@ public final class SequenceFilesFromMail
Configuration jobConfig = job.getConfiguration();
- if (hasOption("keyPrefix")) {
- jobConfig.set("prefix", getOption("keyPrefix"));
+ if (hasOption(KEY_PREFIX_OPTION[0])) {
+ jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0]));
}
int chunkSize = 0;
- if (hasOption("chunkSize")) {
- chunkSize = Integer.parseInt(getOption("chunkSize"));
- jobConfig.set("chunkSize", String.valueOf(chunkSize));
+ if (hasOption(CHUNK_SIZE_OPTION[0])) {
+ chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
+ jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize));
}
Charset charset;
- if (hasOption("charset")) {
- charset = Charset.forName(getOption("charset"));
- jobConfig.set("charset", charset.displayName());
+ if (hasOption(CHARSET_OPTION[0])) {
+ charset = Charset.forName(getOption(CHARSET_OPTION[0]));
+ jobConfig.set(CHARSET_OPTION[0], charset.displayName());
}
- if (hasOption("from")) {
- jobConfig.set("fromOpt", "true");
+ if (hasOption(FROM_OPTION[0])) {
+ jobConfig.set(FROM_OPTION[1], "true");
}
- if (hasOption("to")) {
- jobConfig.set("toOpt", "true");
+ if (hasOption(TO_OPTION[0])) {
+ jobConfig.set(TO_OPTION[1], "true");
}
- if (hasOption("references")) {
- jobConfig.set("refsOpt", "true");
+ if (hasOption(REFERENCES_OPTION[0])) {
+ jobConfig.set(REFERENCES_OPTION[1], "true");
}
- if (hasOption("subject")) {
- jobConfig.set("subjectOpt", "true");
+ if (hasOption(SUBJECT_OPTION[0])) {
+ jobConfig.set(SUBJECT_OPTION[1], "true");
}
- if (hasOption("quotedRegex")) {
- jobConfig.set("quotedRegex", Pattern.compile(getOption("quotedRegex")).toString());
+ if (hasOption(QUOTED_REGEX_OPTION[0])) {
+ jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString());
+ }
+
+ if (hasOption(SEPARATOR_OPTION[0])) {
+ jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0]));
+ } else {
+ jobConfig.set(SEPARATOR_OPTION[1], "\n");
}
- if (hasOption("separatorOpt")) {
- jobConfig.set("separatorOpt", getOption("separatorOpt"));
+ if (hasOption(BODY_OPTION[0])) {
+ jobConfig.set(BODY_OPTION[1], "true");
} else {
- jobConfig.set("separatorOpt", "\n");
+ jobConfig.set(BODY_OPTION[1], "false");
}
- if (hasOption("body")) {
- jobConfig.set("bodyOpt", "true");
+ if (hasOption(BODY_SEPARATOR_OPTION[0])) {
+ jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0]));
} else {
- jobConfig.set("bodyOpt", "false");
+ jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n");
}
FileSystem fs = FileSystem.get(jobConfig);
FileStatus fsFileStatus = fs.getFileStatus(inputPath);
- jobConfig.set("baseinputpath", inputPath.toString());
+ jobConfig.set(BASE_INPUT_PATH, inputPath.toString());
String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
FileInputFormat.setInputPaths(job, inputDirList);
@@ -334,7 +320,7 @@ public final class SequenceFilesFromMail
FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
// set the max split locations, otherwise we get nasty debug stuff
- jobConfig.set("mapreduce.job.max.split.locations", "1000000");
+ jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
boolean succeeded = job.waitForCompletion(true);
if (!succeeded) {
Modified: mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java?rev=1496927&r1=1496926&r2=1496927&view=diff
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java (original)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java Wed Jun 26 13:34:45 2013
@@ -17,120 +17,131 @@
package org.apache.mahout.text;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.iterator.FileLineIterable;
import org.apache.mahout.utils.email.MailOptions;
import org.apache.mahout.utils.email.MailProcessor;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION;
/**
- *
* Map Class for the SequenceFilesFromMailArchives job
- *
*/
public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
-
+
private Text outKey = new Text();
private Text outValue = new Text();
-
+
private static final Pattern MESSAGE_START = Pattern.compile(
- "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
+ "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
- "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
+ "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
private MailOptions options;
-
+
@Override
public void setup(Context context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
+ Configuration configuration = context.getConfiguration();
+
// absorb all of the options into the MailOptions object
-
this.options = new MailOptions();
- options.setPrefix(conf.get("prefix", ""));
-
- if (!conf.get("chunkSize", "").equals("")) {
- options.setChunkSize(conf.getInt("chunkSize", 64));
- }
-
- if (!conf.get("charset", "").equals("")) {
- Charset charset = Charset.forName(conf.get("charset", "UTF-8"));
+ options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], ""));
+
+ if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) {
+ options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64));
+ }
+
+ if (!configuration.get(CHARSET_OPTION[0], "").equals("")) {
+ Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8"));
options.setCharset(charset);
} else {
Charset charset = Charset.forName("UTF-8");
options.setCharset(charset);
}
-
+
List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
// patternOrder is used downstream so that we can know what order the
// text is in instead
// of encoding it in the string, which
// would require more processing later to remove it pre feature
// selection.
- Map<String,Integer> patternOrder = Maps.newHashMap();
+ Map<String, Integer> patternOrder = Maps.newHashMap();
int order = 0;
-
- if (!conf.get("fromOpt", "").equals("")) {
+ if (!configuration.get(FROM_OPTION[1], "").equals("")) {
patterns.add(MailProcessor.FROM_PREFIX);
patternOrder.put(MailOptions.FROM, order++);
}
- if (!conf.get("toOpt", "").equals("")) {
+ if (!configuration.get(TO_OPTION[1], "").equals("")) {
patterns.add(MailProcessor.TO_PREFIX);
patternOrder.put(MailOptions.TO, order++);
}
- if (!conf.get("refsOpt", "").equals("")) {
+ if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) {
patterns.add(MailProcessor.REFS_PREFIX);
patternOrder.put(MailOptions.REFS, order++);
}
-
- if (!conf.get("subjectOpt", "").equals("")) {
+
+ if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) {
patterns.add(MailProcessor.SUBJECT_PREFIX);
- patternOrder.put(MailOptions.SUBJECT, order++);
+ patternOrder.put(MailOptions.SUBJECT, order += 1);
}
-
- options.setStripQuotedText(conf.getBoolean("quotedOpt", false));
-
+
+ options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false));
+
options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
options.setPatternOrder(patternOrder);
-
- options.setIncludeBody(conf.getBoolean("bodyOpt", false));
-
+
+ options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false));
+
options.setSeparator("\n");
- if (!conf.get("separatorOpt", "").equals("")) {
- options.setSeparator(conf.get("separatorOpt", ""));
+ if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) {
+ options.setSeparator(configuration.get(SEPARATOR_OPTION[1], ""));
}
- if (!conf.get("bodySeparatorOpt", "").equals("")) {
- options.setBodySeparator(conf.get("bodySeparatorOpt", ""));
+ if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) {
+ options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], ""));
}
- if (!conf.get("quotedRegexOpt", "").equals("")) {
- options.setQuotedTextPattern(Pattern.compile(conf.get("quotedRegexOpt", "")));
+ if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) {
+ options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], "")));
}
}
-
- public long parseMboxLineByLine(String filename, InputStream mboxInputStream, Context context)
+
+ public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context)
throws IOException, InterruptedException {
long messageCount = 0;
try {
@@ -139,19 +150,19 @@ public class SequenceFilesFromMailArchiv
Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
String[] patternResults = new String[options.getPatternsToMatch().length];
- Matcher[] matchers = new Matcher[options.getPatternsToMatch().length];
- for (int i = 0; i < matchers.length; i++) {
- matchers[i] = options.getPatternsToMatch()[i].matcher("");
+ Matcher[] matches = new Matcher[options.getPatternsToMatch().length];
+ for (int i = 0; i < matches.length; i++) {
+ matches[i] = options.getPatternsToMatch()[i].matcher("");
}
-
+
String messageId = null;
boolean inBody = false;
Pattern quotedTextPattern = options.getQuotedTextPattern();
-
- for (String nextLine : new FileLineIterable(mboxInputStream, options.getCharset(), false, filename)) {
+
+ for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) {
if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
- for (int i = 0; i < matchers.length; i++) {
- Matcher matcher = matchers[i];
+ for (int i = 0; i < matches.length; i++) {
+ Matcher matcher = matches[i];
matcher.reset(nextLine);
if (matcher.matches()) {
patternResults[i] = matcher.group(1);
@@ -202,7 +213,6 @@ public class SequenceFilesFromMailArchiv
if (messageId != null) {
String key = generateKey(filename, options.getPrefix(), messageId);
writeContent(options.getSeparator(), contents, body, patternResults);
-
this.outKey.set(key);
this.outValue.set(contents.toString());
context.write(this.outKey, this.outValue);
@@ -211,23 +221,16 @@ public class SequenceFilesFromMailArchiv
} catch (FileNotFoundException ignored) {
}
- // TODO: report exceptions and continue;
return messageCount;
}
-
+
protected static String generateKey(String mboxFilename, String prefix, String messageId) {
- return prefix + File.separator + mboxFilename + File.separator + messageId;
+ return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator());
}
-
+
private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
- for (String match : matches) {
- if (match != null) {
- contents.append(match).append(separator);
- } else {
- contents.append("").append(separator);
- }
- }
- contents.append(body);
+ String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator());
+ contents.append(matchesString).append(separator).append(body);
}
public void map(IntWritable key, BytesWritable value, Context context)
@@ -236,6 +239,6 @@ public class SequenceFilesFromMailArchiv
Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
- parseMboxLineByLine(relativeFilePath, is, context);
+ parseMailboxLineByLine(relativeFilePath, is, context);
}
}
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java?rev=1496927&r1=1496926&r2=1496927&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/SequenceFilesFromMailArchivesTest.java Wed Jun 26 13:34:45 2013
@@ -38,8 +38,6 @@ import org.junit.Test;
*/
public final class SequenceFilesFromMailArchivesTest extends MahoutTestCase {
- // TODO: Negative tests
-
private File inputDir;
/**
Modified: mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1496927&r1=1496926&r2=1496927&view=diff
==============================================================================
--- mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java (original)
+++ mahout/trunk/integration/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java Wed Jun 26 13:34:45 2013
@@ -58,9 +58,9 @@ public final class TestSequenceFilesFrom
@Test
public void testSequenceFileFromDirectoryBasic() throws Exception {
// parameters
- Configuration conf = new Configuration();
+ Configuration configuration = new Configuration();
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(configuration);
// create
Path tmpDir = this.getTestTempDirPath();
@@ -74,7 +74,7 @@ public final class TestSequenceFilesFrom
fs.mkdirs(inputDirRecursive);
// prepare input files
- createFilesFromArrays(conf, inputDir, DATA1);
+ createFilesFromArrays(configuration, inputDir, DATA1);
SequenceFilesFromDirectory.main(new String[]{
"--input", inputDir.toString(),
@@ -85,9 +85,9 @@ public final class TestSequenceFilesFrom
"--method", "sequential"});
// check output chunk files
- checkChunkFiles(conf, outputDir, DATA1, "UID");
+ checkChunkFiles(configuration, outputDir, DATA1, "UID");
- createRecursiveDirFilesFromArrays(conf, inputDirRecursive, DATA2);
+ createRecursiveDirFilesFromArrays(configuration, inputDirRecursive, DATA2);
FileStatus fstInputPath = fs.getFileStatus(inputDirRecursive);
String dirs = HadoopUtil.buildDirList(fs, fstInputPath);
@@ -101,7 +101,7 @@ public final class TestSequenceFilesFrom
"--keyPrefix", "UID",
"--method", "sequential"});
- checkRecursiveChunkFiles(conf, outputDirRecursive, DATA2, "UID");
+ checkRecursiveChunkFiles(configuration, outputDirRecursive, DATA2, "UID");
}
@Test
@@ -166,8 +166,9 @@ public final class TestSequenceFilesFrom
}
}
- private static void createRecursiveDirFilesFromArrays(Configuration conf, Path inputDir, String[][] data) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ private static void createRecursiveDirFilesFromArrays(Configuration configuration, Path inputDir,
+ String[][] data) throws IOException {
+ FileSystem fs = FileSystem.get(configuration);
logger.info("creativeRecursiveDirFilesFromArrays > based on: {}", inputDir.toString());
Path curPath;
@@ -193,11 +194,11 @@ public final class TestSequenceFilesFrom
}
}
- private static void checkChunkFiles(Configuration conf,
+ private static void checkChunkFiles(Configuration configuration,
Path outputDir,
String[][] data,
String prefix) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(configuration);
// output exists?
FileStatus[] fileStatuses = fs.listStatus(outputDir, new ExcludeDotFiles());
@@ -210,7 +211,8 @@ public final class TestSequenceFilesFrom
}
// read a chunk to check content
- SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, conf);
+ SequenceFileIterator<Text, Text> iterator =
+ new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, configuration);
try {
while (iterator.hasNext()) {
Pair<Text, Text> record = iterator.next();
@@ -233,11 +235,11 @@ public final class TestSequenceFilesFrom
}
}
- private static void checkRecursiveChunkFiles(Configuration conf,
+ private static void checkRecursiveChunkFiles(Configuration configuration,
Path outputDir,
String[][] data,
String prefix) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(configuration);
System.out.println(" ----------- check_Recursive_ChunkFiles ------------");
@@ -255,7 +257,7 @@ public final class TestSequenceFilesFrom
}
// read a chunk to check content
- SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, conf);
+ SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(fileStatuses[0].getPath(), true, configuration);
try {
while (iterator.hasNext()) {
Pair<Text, Text> record = iterator.next();
@@ -302,9 +304,9 @@ public final class TestSequenceFilesFrom
}
}
- private static void checkMRResultFilesRecursive(Configuration conf, Path outputDir,
+ private static void checkMRResultFilesRecursive(Configuration configuration, Path outputDir,
String[][] data, String prefix) throws IOException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(configuration);
// output exists?
FileStatus[] fileStatuses = fs.listStatus(outputDir.suffix("/part-m-00000"), new ExcludeDotFiles());
@@ -320,7 +322,7 @@ public final class TestSequenceFilesFrom
// read a chunk to check content
SequenceFileIterator<Text, Text> iterator = new SequenceFileIterator<Text, Text>(
- fileStatuses[0].getPath(), true, conf);
+ fileStatuses[0].getPath(), true, configuration);
try {
while (iterator.hasNext()) {
Pair<Text, Text> record = iterator.next();