You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:51:59 UTC
[31/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045
Delete directories which were moved/no longer in use
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java b/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
deleted file mode 100644
index 0ae79ad..0000000
--- a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsMapper.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.clustering.evaluation;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
-import org.apache.mahout.common.ClassUtils;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.distance.DistanceMeasure;
-import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
-import org.apache.mahout.common.iterator.sequencefile.PathFilters;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
-import org.apache.mahout.math.VectorWritable;
-
-public class RepresentativePointsMapper
- extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, WeightedVectorWritable> {
-
- private Map<Integer, List<VectorWritable>> representativePoints;
- private final Map<Integer, WeightedVectorWritable> mostDistantPoints = new HashMap<>();
- private DistanceMeasure measure = new EuclideanDistanceMeasure();
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- for (Map.Entry<Integer, WeightedVectorWritable> entry : mostDistantPoints.entrySet()) {
- context.write(new IntWritable(entry.getKey()), entry.getValue());
- }
- super.cleanup(context);
- }
-
- @Override
- protected void map(IntWritable clusterId, WeightedVectorWritable point, Context context)
- throws IOException, InterruptedException {
- mapPoint(clusterId, point, measure, representativePoints, mostDistantPoints);
- }
-
- public static void mapPoint(IntWritable clusterId,
- WeightedVectorWritable point,
- DistanceMeasure measure,
- Map<Integer, List<VectorWritable>> representativePoints,
- Map<Integer, WeightedVectorWritable> mostDistantPoints) {
- int key = clusterId.get();
- WeightedVectorWritable currentMDP = mostDistantPoints.get(key);
-
- List<VectorWritable> repPoints = representativePoints.get(key);
- double totalDistance = 0.0;
- if (repPoints != null) {
- for (VectorWritable refPoint : repPoints) {
- totalDistance += measure.distance(refPoint.get(), point.getVector());
- }
- }
- if (currentMDP == null || currentMDP.getWeight() < totalDistance) {
- mostDistantPoints.put(key, new WeightedVectorWritable(totalDistance, point.getVector().clone()));
- }
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- measure =
- ClassUtils.instantiateAs(conf.get(RepresentativePointsDriver.DISTANCE_MEASURE_KEY), DistanceMeasure.class);
- representativePoints = getRepresentativePoints(conf);
- }
-
- public void configure(Map<Integer, List<VectorWritable>> referencePoints, DistanceMeasure measure) {
- this.representativePoints = referencePoints;
- this.measure = measure;
- }
-
- public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf) {
- String statePath = conf.get(RepresentativePointsDriver.STATE_IN_KEY);
- return getRepresentativePoints(conf, new Path(statePath));
- }
-
- public static Map<Integer, List<VectorWritable>> getRepresentativePoints(Configuration conf, Path statePath) {
- Map<Integer, List<VectorWritable>> representativePoints = new HashMap<>();
- for (Pair<IntWritable,VectorWritable> record
- : new SequenceFileDirIterable<IntWritable,VectorWritable>(statePath,
- PathType.LIST,
- PathFilters.logsCRCFilter(),
- conf)) {
- int keyValue = record.getFirst().get();
- List<VectorWritable> repPoints = representativePoints.get(keyValue);
- if (repPoints == null) {
- repPoints = new ArrayList<>();
- representativePoints.put(keyValue, repPoints);
- }
- repPoints.add(record.getSecond());
- }
- return representativePoints;
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java b/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
deleted file mode 100644
index 27ca861..0000000
--- a/integration/src/main/java/org/apache/mahout/clustering/evaluation/RepresentativePointsReducer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.clustering.evaluation;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.clustering.classify.WeightedVectorWritable;
-import org.apache.mahout.math.VectorWritable;
-
-public class RepresentativePointsReducer
- extends Reducer<IntWritable, WeightedVectorWritable, IntWritable, VectorWritable> {
-
- private Map<Integer, List<VectorWritable>> representativePoints;
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- for (Map.Entry<Integer, List<VectorWritable>> entry : representativePoints.entrySet()) {
- IntWritable iw = new IntWritable(entry.getKey());
- for (VectorWritable vw : entry.getValue()) {
- context.write(iw, vw);
- }
- }
- super.cleanup(context);
- }
-
- @Override
- protected void reduce(IntWritable key, Iterable<WeightedVectorWritable> values, Context context)
- throws IOException, InterruptedException {
- // find the most distant point
- WeightedVectorWritable mdp = null;
- for (WeightedVectorWritable dpw : values) {
- if (mdp == null || mdp.getWeight() < dpw.getWeight()) {
- mdp = new WeightedVectorWritable(dpw.getWeight(), dpw.getVector());
- }
- }
- context.write(new IntWritable(key.get()), new VectorWritable(mdp.getVector()));
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- Configuration conf = context.getConfiguration();
- representativePoints = RepresentativePointsMapper.getRepresentativePoints(conf);
- }
-
- public void configure(Map<Integer, List<VectorWritable>> representativePoints) {
- this.representativePoints = representativePoints;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java b/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
deleted file mode 100644
index 392909e..0000000
--- a/integration/src/main/java/org/apache/mahout/clustering/lda/LDAPrintTopics.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.clustering.lda;
-
-import com.google.common.io.Closeables;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import org.apache.commons.cli2.CommandLine;
-import org.apache.commons.cli2.Group;
-import org.apache.commons.cli2.Option;
-import org.apache.commons.cli2.OptionException;
-import org.apache.commons.cli2.builder.ArgumentBuilder;
-import org.apache.commons.cli2.builder.DefaultOptionBuilder;
-import org.apache.commons.cli2.builder.GroupBuilder;
-import org.apache.commons.cli2.commandline.Parser;
-import org.apache.commons.io.Charsets;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.mahout.common.CommandLineUtil;
-import org.apache.mahout.common.IntPairWritable;
-import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.common.iterator.sequencefile.PathType;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
-import org.apache.mahout.utils.vectors.VectorHelper;
-
-/**
- * Class to print out the top K words for each topic.
- */
-public final class LDAPrintTopics {
-
- private LDAPrintTopics() { }
-
- // Expands the queue list to have a Queue for topic K
- private static void ensureQueueSize(Collection<Queue<Pair<String,Double>>> queues, int k) {
- for (int i = queues.size(); i <= k; ++i) {
- queues.add(new PriorityQueue<Pair<String,Double>>());
- }
- }
-
- public static void main(String[] args) throws Exception {
- DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
- ArgumentBuilder abuilder = new ArgumentBuilder();
- GroupBuilder gbuilder = new GroupBuilder();
-
- Option inputOpt = DefaultOptionCreator.inputOption().create();
-
- Option dictOpt = obuilder.withLongName("dict").withRequired(true).withArgument(
- abuilder.withName("dict").withMinimum(1).withMaximum(1).create()).withDescription(
- "Dictionary to read in, in the same format as one created by "
- + "org.apache.mahout.utils.vectors.lucene.Driver").withShortName("d").create();
-
- Option outOpt = DefaultOptionCreator.outputOption().create();
-
- Option wordOpt = obuilder.withLongName("words").withRequired(false).withArgument(
- abuilder.withName("words").withMinimum(0).withMaximum(1).withDefault("20").create()).withDescription(
- "Number of words to print").withShortName("w").create();
- Option dictTypeOpt = obuilder.withLongName("dictionaryType").withRequired(false).withArgument(
- abuilder.withName("dictionaryType").withMinimum(1).withMaximum(1).create()).withDescription(
- "The dictionary file type (text|sequencefile)").withShortName("dt").create();
- Option helpOpt = obuilder.withLongName("help").withDescription("Print out help").withShortName("h")
- .create();
-
- Group group = gbuilder.withName("Options").withOption(dictOpt).withOption(outOpt).withOption(wordOpt)
- .withOption(inputOpt).withOption(dictTypeOpt).create();
- try {
- Parser parser = new Parser();
- parser.setGroup(group);
- CommandLine cmdLine = parser.parse(args);
-
- if (cmdLine.hasOption(helpOpt)) {
- CommandLineUtil.printHelp(group);
- return;
- }
-
- String input = cmdLine.getValue(inputOpt).toString();
- String dictFile = cmdLine.getValue(dictOpt).toString();
- int numWords = 20;
- if (cmdLine.hasOption(wordOpt)) {
- numWords = Integer.parseInt(cmdLine.getValue(wordOpt).toString());
- }
- Configuration config = new Configuration();
-
- String dictionaryType = "text";
- if (cmdLine.hasOption(dictTypeOpt)) {
- dictionaryType = cmdLine.getValue(dictTypeOpt).toString();
- }
-
- List<String> wordList;
- if ("text".equals(dictionaryType)) {
- wordList = Arrays.asList(VectorHelper.loadTermDictionary(new File(dictFile)));
- } else if ("sequencefile".equals(dictionaryType)) {
- wordList = Arrays.asList(VectorHelper.loadTermDictionary(config, dictFile));
- } else {
- throw new IllegalArgumentException("Invalid dictionary format");
- }
-
- List<Queue<Pair<String,Double>>> topWords = topWordsForTopics(input, config, wordList, numWords);
-
- File output = null;
- if (cmdLine.hasOption(outOpt)) {
- output = new File(cmdLine.getValue(outOpt).toString());
- if (!output.exists() && !output.mkdirs()) {
- throw new IOException("Could not create directory: " + output);
- }
- }
- printTopWords(topWords, output);
- } catch (OptionException e) {
- CommandLineUtil.printHelp(group);
- throw e;
- }
- }
-
- // Adds the word if the queue is below capacity, or the score is high enough
- private static void maybeEnqueue(Queue<Pair<String,Double>> q, String word, double score, int numWordsToPrint) {
- if (q.size() >= numWordsToPrint && score > q.peek().getSecond()) {
- q.poll();
- }
- if (q.size() < numWordsToPrint) {
- q.add(new Pair<>(word, score));
- }
- }
-
- private static void printTopWords(List<Queue<Pair<String,Double>>> topWords, File outputDir)
- throws IOException {
- for (int i = 0; i < topWords.size(); ++i) {
- Collection<Pair<String,Double>> topK = topWords.get(i);
- Writer out = null;
- boolean printingToSystemOut = false;
- try {
- if (outputDir != null) {
- out = new OutputStreamWriter(new FileOutputStream(new File(outputDir, "topic_" + i)), Charsets.UTF_8);
- } else {
- out = new OutputStreamWriter(System.out, Charsets.UTF_8);
- printingToSystemOut = true;
- out.write("Topic " + i);
- out.write('\n');
- out.write("===========");
- out.write('\n');
- }
- List<Pair<String,Double>> topKasList = new ArrayList<>(topK.size());
- for (Pair<String,Double> wordWithScore : topK) {
- topKasList.add(wordWithScore);
- }
- Collections.sort(topKasList, new Comparator<Pair<String,Double>>() {
- @Override
- public int compare(Pair<String,Double> pair1, Pair<String,Double> pair2) {
- return pair2.getSecond().compareTo(pair1.getSecond());
- }
- });
- for (Pair<String,Double> wordWithScore : topKasList) {
- out.write(wordWithScore.getFirst() + " [p(" + wordWithScore.getFirst() + "|topic_" + i + ") = "
- + wordWithScore.getSecond());
- out.write('\n');
- }
- } finally {
- if (!printingToSystemOut) {
- Closeables.close(out, false);
- } else {
- out.flush();
- }
- }
- }
- }
-
- private static List<Queue<Pair<String,Double>>> topWordsForTopics(String dir,
- Configuration job,
- List<String> wordList,
- int numWordsToPrint) {
- List<Queue<Pair<String,Double>>> queues = new ArrayList<>();
- Map<Integer,Double> expSums = new HashMap<>();
- for (Pair<IntPairWritable,DoubleWritable> record
- : new SequenceFileDirIterable<IntPairWritable, DoubleWritable>(
- new Path(dir, "part-*"), PathType.GLOB, null, null, true, job)) {
- IntPairWritable key = record.getFirst();
- int topic = key.getFirst();
- int word = key.getSecond();
- ensureQueueSize(queues, topic);
- if (word >= 0 && topic >= 0) {
- double score = record.getSecond().get();
- if (expSums.get(topic) == null) {
- expSums.put(topic, 0.0);
- }
- expSums.put(topic, expSums.get(topic) + Math.exp(score));
- String realWord = wordList.get(word);
- maybeEnqueue(queues.get(topic), realWord, score, numWordsToPrint);
- }
- }
- for (int i = 0; i < queues.size(); i++) {
- Queue<Pair<String,Double>> queue = queues.get(i);
- Queue<Pair<String,Double>> newQueue = new PriorityQueue<>(queue.size());
- double norm = expSums.get(i);
- for (Pair<String,Double> pair : queue) {
- newQueue.add(new Pair<>(pair.getFirst(), Math.exp(pair.getSecond()) / norm));
- }
- queues.set(i, newQueue);
- }
- return queues;
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java b/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
deleted file mode 100644
index 12ed471..0000000
--- a/integration/src/main/java/org/apache/mahout/text/MailArchivesClusteringAnalyzer.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.text;
-
-import org.apache.lucene.analysis.TokenFilter;
-import org.apache.lucene.analysis.TokenStream;
-import org.apache.lucene.analysis.Tokenizer;
-import org.apache.lucene.analysis.core.LowerCaseFilter;
-import org.apache.lucene.analysis.core.StopFilter;
-import org.apache.lucene.analysis.en.PorterStemFilter;
-import org.apache.lucene.analysis.miscellaneous.ASCIIFoldingFilter;
-import org.apache.lucene.analysis.standard.StandardFilter;
-import org.apache.lucene.analysis.standard.StandardTokenizer;
-import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
-import org.apache.lucene.analysis.util.CharArraySet;
-import org.apache.lucene.analysis.util.StopwordAnalyzerBase;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Custom Lucene Analyzer designed for aggressive feature reduction
- * for clustering the ASF Mail Archives using an extended set of
- * stop words, excluding non-alpha-numeric tokens, and porter stemming.
- */
-public final class MailArchivesClusteringAnalyzer extends StopwordAnalyzerBase {
- // extended set of stop words composed of common mail terms like "hi",
- // HTML tags, and Java keywords asmany of the messages in the archives
- // are subversion check-in notifications
-
- private static final CharArraySet STOP_SET = new CharArraySet(Arrays.asList(
- "3d","7bit","a0","about","above","abstract","across","additional","after",
- "afterwards","again","against","align","all","almost","alone","along",
- "already","also","although","always","am","among","amongst","amoungst",
- "amount","an","and","another","any","anybody","anyhow","anyone","anything",
- "anyway","anywhere","are","arial","around","as","ascii","assert","at",
- "back","background","base64","bcc","be","became","because","become","becomes",
- "becoming","been","before","beforehand","behind","being","below","beside",
- "besides","between","beyond","bgcolor","blank","blockquote","body","boolean",
- "border","both","br","break","but","by","can","cannot","cant","case","catch",
- "cc","cellpadding","cellspacing","center","char","charset","cheers","class",
- "co","color","colspan","com","con","const","continue","could","couldnt",
- "cry","css","de","dear","default","did","didnt","different","div","do",
- "does","doesnt","done","dont","double","down","due","during","each","eg",
- "eight","either","else","elsewhere","empty","encoding","enough","enum",
- "etc","eu","even","ever","every","everyone","everything","everywhere",
- "except","extends","face","family","few","ffffff","final","finally","float",
- "font","for","former","formerly","fri","from","further","get","give","go",
- "good","got","goto","gt","h1","ha","had","has","hasnt","have","he","head",
- "height","hello","helvetica","hence","her","here","hereafter","hereby",
- "herein","hereupon","hers","herself","hi","him","himself","his","how",
- "however","hr","href","html","http","https","id","ie","if","ill","im",
- "image","img","implements","import","in","inc","instanceof","int","interface",
- "into","is","isnt","iso-8859-1","it","its","itself","ive","just","keep",
- "last","latter","latterly","least","left","less","li","like","long","look",
- "lt","ltd","mail","mailto","many","margin","may","me","meanwhile","message",
- "meta","might","mill","mine","mon","more","moreover","most","mostly","mshtml",
- "mso","much","must","my","myself","name","namely","native","nbsp","need",
- "neither","never","nevertheless","new","next","nine","no","nobody","none",
- "noone","nor","not","nothing","now","nowhere","null","of","off","often",
- "ok","on","once","only","onto","or","org","other","others","otherwise",
- "our","ours","ourselves","out","over","own","package","pad","per","perhaps",
- "plain","please","pm","printable","private","protected","public","put",
- "quot","quote","r1","r2","rather","re","really","regards","reply","return",
- "right","said","same","sans","sat","say","saying","see","seem","seemed",
- "seeming","seems","serif","serious","several","she","short","should","show",
- "side","since","sincere","six","sixty","size","so","solid","some","somehow",
- "someone","something","sometime","sometimes","somewhere","span","src",
- "static","still","strictfp","string","strong","style","stylesheet","subject",
- "such","sun","super","sure","switch","synchronized","table","take","target",
- "td","text","th","than","thanks","that","the","their","them","themselves",
- "then","thence","there","thereafter","thereby","therefore","therein","thereupon",
- "these","they","thick","thin","think","third","this","those","though",
- "three","through","throughout","throw","throws","thru","thu","thus","tm",
- "to","together","too","top","toward","towards","tr","transfer","transient",
- "try","tue","type","ul","un","under","unsubscribe","until","up","upon",
- "us","use","used","uses","using","valign","verdana","very","via","void",
- "volatile","want","was","we","wed","weight","well","were","what","whatever",
- "when","whence","whenever","where","whereafter","whereas","whereby","wherein",
- "whereupon","wherever","whether","which","while","whither","who","whoever",
- "whole","whom","whose","why","width","will","with","within","without",
- "wont","would","wrote","www","yes","yet","you","your","yours","yourself",
- "yourselves"
- ), false);
-
- // Regex used to exclude non-alpha-numeric tokens
- private static final Pattern ALPHA_NUMERIC = Pattern.compile("^[a-z][a-z0-9_]+$");
- private static final Matcher MATCHER = ALPHA_NUMERIC.matcher("");
-
- public MailArchivesClusteringAnalyzer() {
- super(STOP_SET);
- }
-
- public MailArchivesClusteringAnalyzer(CharArraySet stopSet) {
- super(stopSet);
- }
-
- @Override
- protected TokenStreamComponents createComponents(String fieldName) {
- Tokenizer tokenizer = new StandardTokenizer();
- TokenStream result = new StandardFilter(tokenizer);
- result = new LowerCaseFilter(result);
- result = new ASCIIFoldingFilter(result);
- result = new AlphaNumericMaxLengthFilter(result);
- result = new StopFilter(result, STOP_SET);
- result = new PorterStemFilter(result);
- return new TokenStreamComponents(tokenizer, result);
- }
-
- /**
- * Matches alpha-numeric tokens between 2 and 40 chars long.
- */
- static class AlphaNumericMaxLengthFilter extends TokenFilter {
- private final CharTermAttribute termAtt;
- private final char[] output = new char[28];
-
- AlphaNumericMaxLengthFilter(TokenStream in) {
- super(in);
- termAtt = addAttribute(CharTermAttribute.class);
- }
-
- @Override
- public final boolean incrementToken() throws IOException {
- // return the first alpha-numeric token between 2 and 40 length
- while (input.incrementToken()) {
- int length = termAtt.length();
- if (length >= 2 && length <= 28) {
- char[] buf = termAtt.buffer();
- int at = 0;
- for (int c = 0; c < length; c++) {
- char ch = buf[c];
- if (ch != '\'') {
- output[at++] = ch;
- }
- }
- String term = new String(output, 0, at);
- MATCHER.reset(term);
- if (MATCHER.matches() && !term.startsWith("a0")) {
- termAtt.setEmpty();
- termAtt.append(term);
- return true;
- }
- }
- }
- return false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java b/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
deleted file mode 100644
index 44df006..0000000
--- a/integration/src/main/java/org/apache/mahout/text/MultipleTextFileInputFormat.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-
-/**
- *
- * Used in combining a large number of text files into one text input reader
- * along with the WholeFileRecordReader class.
- *
- */
-public class MultipleTextFileInputFormat extends CombineFileInputFormat<IntWritable, BytesWritable> {
-
- @Override
- public RecordReader<IntWritable, BytesWritable> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext)
- throws IOException {
- return new CombineFileRecordReader<>((CombineFileSplit) inputSplit,
- taskAttemptContext, WholeFileRecordReader.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java b/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
deleted file mode 100644
index 37ebc44..0000000
--- a/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.mahout.common.iterator.FileLineIterable;
-import org.apache.mahout.utils.io.ChunkedWriter;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-/**
- * Default parser for parsing text into sequence files.
- */
-public final class PrefixAdditionFilter extends SequenceFilesFromDirectoryFilter {
-
- public PrefixAdditionFilter(Configuration conf,
- String keyPrefix,
- Map<String, String> options,
- ChunkedWriter writer,
- Charset charset,
- FileSystem fs) {
- super(conf, keyPrefix, options, writer, charset, fs);
- }
-
- @Override
- protected void process(FileStatus fst, Path current) throws IOException {
- FileSystem fs = getFs();
- ChunkedWriter writer = getWriter();
- if (fst.isDir()) {
- String dirPath = getPrefix() + Path.SEPARATOR + current.getName() + Path.SEPARATOR + fst.getPath().getName();
- fs.listStatus(fst.getPath(),
- new PrefixAdditionFilter(getConf(), dirPath, getOptions(), writer, getCharset(), fs));
- } else {
- try (InputStream in = fs.open(fst.getPath())){
- StringBuilder file = new StringBuilder();
- for (String aFit : new FileLineIterable(in, getCharset(), false)) {
- file.append(aFit).append('\n');
- }
- String name = current.getName().equals(fst.getPath().getName())
- ? current.getName()
- : current.getName() + Path.SEPARATOR + fst.getPath().getName();
- writer.write(getPrefix() + Path.SEPARATOR + name, file.toString());
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
deleted file mode 100644
index 311ab8d..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.ClassUtils;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.utils.io.ChunkedWriter;
-
-/**
- * Converts a directory of text documents into SequenceFiles of Specified chunkSize. This class takes in a
- * parent directory containing sub folders of text documents and recursively reads the files and creates the
- * {@link org.apache.hadoop.io.SequenceFile}s of docid => content. The docid is set as the relative path of the
- * document from the parent directory prepended with a specified prefix. You can also specify the input encoding
- * of the text files. The content of the output SequenceFiles are encoded as UTF-8 text.
- */
-public class SequenceFilesFromDirectory extends AbstractJob {
-
- private static final String PREFIX_ADDITION_FILTER = PrefixAdditionFilter.class.getName();
-
- private static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
- public static final String[] FILE_FILTER_CLASS_OPTION = {"fileFilterClass", "filter"};
- private static final String[] CHARSET_OPTION = {"charset", "c"};
-
- private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
-
- public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
- public static final String BASE_INPUT_PATH = "baseinputpath";
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new SequenceFilesFromDirectory(), args);
- }
-
- /*
- * callback main after processing MapReduce parameters
- */
- @Override
- public int run(String[] args) throws Exception {
- addOptions();
- addOption(DefaultOptionCreator.methodOption().create());
- addOption(DefaultOptionCreator.overwriteOption().create());
-
- if (parseArguments(args) == null) {
- return -1;
- }
-
- Map<String, String> options = parseOptions();
- Path output = getOutputPath();
- if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
- HadoopUtil.delete(getConf(), output);
- }
-
- if (getOption(DefaultOptionCreator.METHOD_OPTION,
- DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
- runSequential(getConf(), getInputPath(), output, options);
- } else {
- runMapReduce(getInputPath(), output);
- }
-
- return 0;
- }
-
- private int runSequential(Configuration conf, Path input, Path output, Map<String, String> options)
- throws IOException, InterruptedException, NoSuchMethodException {
- // Running sequentially
- Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
- String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
- FileSystem fs = FileSystem.get(input.toUri(), conf);
-
- try (ChunkedWriter writer = new ChunkedWriter(conf, Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output)) {
- SequenceFilesFromDirectoryFilter pathFilter;
- String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
- if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
- pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer, charset, fs);
- } else {
- pathFilter = ClassUtils.instantiateAs(fileFilterClassName, SequenceFilesFromDirectoryFilter.class,
- new Class[] {Configuration.class, String.class, Map.class, ChunkedWriter.class, Charset.class, FileSystem.class},
- new Object[] {conf, keyPrefix, options, writer, charset, fs});
- }
- fs.listStatus(input, pathFilter);
- }
- return 0;
- }
-
- private int runMapReduce(Path input, Path output) throws IOException, ClassNotFoundException, InterruptedException {
-
- int chunkSizeInMB = 64;
- if (hasOption(CHUNK_SIZE_OPTION[0])) {
- chunkSizeInMB = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
- }
-
- String keyPrefix = null;
- if (hasOption(KEY_PREFIX_OPTION[0])) {
- keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
- }
-
- String fileFilterClassName = null;
- if (hasOption(FILE_FILTER_CLASS_OPTION[0])) {
- fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
- }
-
- PathFilter pathFilter = null;
- // Prefix Addition is presently handled in the Mapper and unlike runsequential()
- // need not be done via a pathFilter
- if (!StringUtils.isBlank(fileFilterClassName) && !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
- try {
- pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
- } catch (InstantiationException | IllegalAccessException e) {
- throw new IllegalStateException(e);
- }
- }
-
- // Prepare Job for submission.
- Job job = prepareJob(input, output, MultipleTextFileInputFormat.class,
- SequenceFilesFromDirectoryMapper.class, Text.class, Text.class,
- SequenceFileOutputFormat.class, "SequenceFilesFromDirectory");
-
- Configuration jobConfig = job.getConfiguration();
- jobConfig.set(KEY_PREFIX_OPTION[0], keyPrefix);
- jobConfig.set(FILE_FILTER_CLASS_OPTION[0], fileFilterClassName);
-
- FileSystem fs = FileSystem.get(jobConfig);
- FileStatus fsFileStatus = fs.getFileStatus(input);
-
- String inputDirList;
- if (pathFilter != null) {
- inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus, pathFilter);
- } else {
- inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
- }
-
- jobConfig.set(BASE_INPUT_PATH, input.toString());
-
- long chunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
-
- // set the max split locations, otherwise we get nasty debug stuff
- jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
-
- FileInputFormat.setInputPaths(job, inputDirList);
- // need to set this to a multiple of the block size, or no split happens
- FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
- FileOutputFormat.setCompressOutput(job, true);
-
- boolean succeeded = job.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- return 0;
- }
-
- /**
- * Override this method in order to add additional options to the command line of the SequenceFileFromDirectory job.
- * Do not forget to call super() otherwise all standard options (input/output dirs etc) will not be available.
- */
- protected void addOptions() {
- addInputOption();
- addOutputOption();
- addOption(DefaultOptionCreator.overwriteOption().create());
- addOption(DefaultOptionCreator.methodOption().create());
- addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
- addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
- "The name of the class to use for file parsing. Default: " + PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
- addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
- addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
- "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
- }
-
- /**
- * Override this method in order to parse your additional options from the command line. Do not forget to call
- * super() otherwise standard options (input/output dirs etc) will not be available.
- *
- * @return Map of options
- */
- protected Map<String, String> parseOptions() {
- Map<String, String> options = new HashMap<>();
- options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0]));
- options.put(FILE_FILTER_CLASS_OPTION[0], getOption(FILE_FILTER_CLASS_OPTION[0]));
- options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0]));
- return options;
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
deleted file mode 100644
index 6e4bd64..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.mahout.utils.io.ChunkedWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.Map;
-
-/**
- * Implement this interface if you wish to extend SequenceFilesFromDirectory with your own parsing logic.
- */
-public abstract class SequenceFilesFromDirectoryFilter implements PathFilter {
- private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class);
-
- private final String prefix;
- private final ChunkedWriter writer;
- private final Charset charset;
- private final FileSystem fs;
- private final Map<String, String> options;
- private final Configuration conf;
-
- protected SequenceFilesFromDirectoryFilter(Configuration conf,
- String keyPrefix,
- Map<String, String> options,
- ChunkedWriter writer,
- Charset charset,
- FileSystem fs) {
- this.prefix = keyPrefix;
- this.writer = writer;
- this.charset = charset;
- this.fs = fs;
- this.options = options;
- this.conf = conf;
- }
-
- protected final String getPrefix() {
- return prefix;
- }
-
- protected final ChunkedWriter getWriter() {
- return writer;
- }
-
- protected final Charset getCharset() {
- return charset;
- }
-
- protected final FileSystem getFs() {
- return fs;
- }
-
- protected final Map<String, String> getOptions() {
- return options;
- }
-
- protected final Configuration getConf() {
- return conf;
- }
-
- @Override
- public final boolean accept(Path current) {
- log.debug("CURRENT: {}", current.getName());
- try {
- for (FileStatus fst : fs.listStatus(current)) {
- log.debug("CHILD: {}", fst.getPath().getName());
- process(fst, current);
- }
- } catch (IOException ioe) {
- throw new IllegalStateException(ioe);
- }
- return false;
- }
-
- protected abstract void process(FileStatus in, Path current) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
deleted file mode 100644
index 40df3c2..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryMapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.mahout.common.HadoopUtil;
-
-import static org.apache.mahout.text.SequenceFilesFromDirectory.KEY_PREFIX_OPTION;
-
-/**
- * Map class for SequenceFilesFromDirectory MR job
- */
-public class SequenceFilesFromDirectoryMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
-
- private String keyPrefix;
- private Text fileValue = new Text();
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.setup(context);
- this.keyPrefix = context.getConfiguration().get(KEY_PREFIX_OPTION[0], "");
- }
-
- public void map(IntWritable key, BytesWritable value, Context context)
- throws IOException, InterruptedException {
-
- Configuration configuration = context.getConfiguration();
- Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
- String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
-
- String filename = this.keyPrefix.length() > 0 ?
- this.keyPrefix + Path.SEPARATOR + relativeFilePath :
- Path.SEPARATOR + relativeFilePath;
-
- fileValue.set(value.getBytes(), 0, value.getBytes().length);
- context.write(new Text(filename), fileValue);
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
deleted file mode 100644
index c17cc12..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mahout.text;
-
-import org.apache.commons.io.DirectoryWalker;
-import org.apache.commons.io.comparator.CompositeFileComparator;
-import org.apache.commons.io.comparator.DirectoryFileComparator;
-import org.apache.commons.io.comparator.PathFileComparator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.apache.mahout.utils.email.MailOptions;
-import org.apache.mahout.utils.email.MailProcessor;
-import org.apache.mahout.utils.io.ChunkedWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-/**
- * Converts a directory of gzipped mail archives into SequenceFiles of specified
- * chunkSize. This class is similar to {@link SequenceFilesFromDirectory} except
- * it uses block-compressed {@link org.apache.hadoop.io.SequenceFile}s and parses out the subject and
- * body text of each mail message into a separate key/value pair.
- */
-public final class SequenceFilesFromMailArchives extends AbstractJob {
-
- private static final Logger log = LoggerFactory.getLogger(SequenceFilesFromMailArchives.class);
-
- public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
- public static final String[] KEY_PREFIX_OPTION = {"keyPrefix", "prefix"};
- public static final String[] CHARSET_OPTION = {"charset", "c"};
- public static final String[] SUBJECT_OPTION = {"subject", "s"};
- public static final String[] TO_OPTION = {"to", "to"};
- public static final String[] FROM_OPTION = {"from", "from"};
- public static final String[] REFERENCES_OPTION = {"references", "refs"};
- public static final String[] BODY_OPTION = {"body", "b"};
- public static final String[] STRIP_QUOTED_OPTION = {"stripQuoted", "q"};
- public static final String[] QUOTED_REGEX_OPTION = {"quotedRegex", "regex"};
- public static final String[] SEPARATOR_OPTION = {"separator", "sep"};
- public static final String[] BODY_SEPARATOR_OPTION = {"bodySeparator", "bodySep"};
- public static final String BASE_INPUT_PATH = "baseinputpath";
-
- private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
-
- public void createSequenceFiles(MailOptions options) throws IOException {
- try (ChunkedWriter writer =
- new ChunkedWriter(getConf(), options.getChunkSize(), new Path(options.getOutputDir()))){
- MailProcessor processor = new MailProcessor(options, options.getPrefix(), writer);
- if (options.getInput().isDirectory()) {
- PrefixAdditionDirectoryWalker walker = new PrefixAdditionDirectoryWalker(processor, writer);
- walker.walk(options.getInput());
- log.info("Parsed {} messages from {}", walker.getMessageCount(), options.getInput().getAbsolutePath());
- } else {
- long start = System.currentTimeMillis();
- long cnt = processor.parseMboxLineByLine(options.getInput());
- long finish = System.currentTimeMillis();
- log.info("Parsed {} messages from {} in time: {}", cnt, options.getInput().getAbsolutePath(), finish - start);
- }
- }
- }
-
- private static class PrefixAdditionDirectoryWalker extends DirectoryWalker<Object> {
-
- @SuppressWarnings("unchecked")
- private static final Comparator<File> FILE_COMPARATOR = new CompositeFileComparator(
- DirectoryFileComparator.DIRECTORY_REVERSE, PathFileComparator.PATH_COMPARATOR);
-
- private final Deque<MailProcessor> processors = new ArrayDeque<>();
- private final ChunkedWriter writer;
- private final Deque<Long> messageCounts = new ArrayDeque<>();
-
- public PrefixAdditionDirectoryWalker(MailProcessor processor, ChunkedWriter writer) {
- processors.addFirst(processor);
- this.writer = writer;
- messageCounts.addFirst(0L);
- }
-
- public void walk(File startDirectory) throws IOException {
- super.walk(startDirectory, null);
- }
-
- public long getMessageCount() {
- return messageCounts.getFirst();
- }
-
- @Override
- protected void handleDirectoryStart(File current, int depth, Collection<Object> results) throws IOException {
- if (depth > 0) {
- log.info("At {}", current.getAbsolutePath());
- MailProcessor processor = processors.getFirst();
- MailProcessor subDirProcessor = new MailProcessor(processor.getOptions(), processor.getPrefix()
- + File.separator + current.getName(), writer);
- processors.push(subDirProcessor);
- messageCounts.push(0L);
- }
- }
-
- @Override
- protected File[] filterDirectoryContents(File directory, int depth, File[] files) throws IOException {
- Arrays.sort(files, FILE_COMPARATOR);
- return files;
- }
-
- @Override
- protected void handleFile(File current, int depth, Collection<Object> results) throws IOException {
- MailProcessor processor = processors.getFirst();
- long currentDirMessageCount = messageCounts.pop();
- try {
- currentDirMessageCount += processor.parseMboxLineByLine(current);
- } catch (IOException e) {
- throw new IllegalStateException("Error processing " + current, e);
- }
- messageCounts.push(currentDirMessageCount);
- }
-
- @Override
- protected void handleDirectoryEnd(File current, int depth, Collection<Object> results) throws IOException {
- if (depth > 0) {
- final long currentDirMessageCount = messageCounts.pop();
- log.info("Parsed {} messages from directory {}", currentDirMessageCount, current.getAbsolutePath());
-
- processors.pop();
-
- // aggregate message counts
- long parentDirMessageCount = messageCounts.pop();
- parentDirMessageCount += currentDirMessageCount;
- messageCounts.push(parentDirMessageCount);
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new Configuration(), new SequenceFilesFromMailArchives(), args);
- }
-
- @Override
- public int run(String[] args) throws Exception {
- addInputOption();
- addOutputOption();
- addOption(DefaultOptionCreator.methodOption().create());
-
- addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in MegaBytes. Defaults to 64", "64");
- addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be prepended to the key", "");
- addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
- "The name of the character encoding of the input files. Default to UTF-8", "UTF-8");
- addFlag(SUBJECT_OPTION[0], SUBJECT_OPTION[1], "Include the Mail subject as part of the text. Default is false");
- addFlag(TO_OPTION[0], TO_OPTION[1], "Include the to field in the text. Default is false");
- addFlag(FROM_OPTION[0], FROM_OPTION[1], "Include the from field in the text. Default is false");
- addFlag(REFERENCES_OPTION[0], REFERENCES_OPTION[1],
- "Include the references field in the text. Default is false");
- addFlag(BODY_OPTION[0], BODY_OPTION[1], "Include the body in the output. Default is false");
- addFlag(STRIP_QUOTED_OPTION[0], STRIP_QUOTED_OPTION[1],
- "Strip (remove) quoted email text in the body. Default is false");
- addOption(QUOTED_REGEX_OPTION[0], QUOTED_REGEX_OPTION[1],
- "Specify the regex that identifies quoted text. "
- + "Default is to look for > or | at the beginning of the line.");
- addOption(SEPARATOR_OPTION[0], SEPARATOR_OPTION[1],
- "The separator to use between metadata items (to, from, etc.). Default is \\n", "\n");
- addOption(BODY_SEPARATOR_OPTION[0], BODY_SEPARATOR_OPTION[1],
- "The separator to use between lines in the body. Default is \\n. "
- + "Useful to change if you wish to have the message be on one line", "\n");
-
- addOption(DefaultOptionCreator.helpOption());
- Map<String, List<String>> parsedArgs = parseArguments(args);
- if (parsedArgs == null) {
- return -1;
- }
- File input = getInputFile();
- String outputDir = getOutputPath().toString();
-
- int chunkSize = 64;
- if (hasOption(CHUNK_SIZE_OPTION[0])) {
- chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
- }
-
- String prefix = "";
- if (hasOption(KEY_PREFIX_OPTION[0])) {
- prefix = getOption(KEY_PREFIX_OPTION[0]);
- }
-
- Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
- MailOptions options = new MailOptions();
- options.setInput(input);
- options.setOutputDir(outputDir);
- options.setPrefix(prefix);
- options.setChunkSize(chunkSize);
- options.setCharset(charset);
-
- List<Pattern> patterns = new ArrayList<>(5);
- // patternOrder is used downstream so that we can know what order the text
- // is in instead of encoding it in the string, which
- // would require more processing later to remove it pre feature selection.
- Map<String, Integer> patternOrder = new HashMap<>();
- int order = 0;
- if (hasOption(FROM_OPTION[0])) {
- patterns.add(MailProcessor.FROM_PREFIX);
- patternOrder.put(MailOptions.FROM, order++);
- }
- if (hasOption(TO_OPTION[0])) {
- patterns.add(MailProcessor.TO_PREFIX);
- patternOrder.put(MailOptions.TO, order++);
- }
- if (hasOption(REFERENCES_OPTION[0])) {
- patterns.add(MailProcessor.REFS_PREFIX);
- patternOrder.put(MailOptions.REFS, order++);
- }
- if (hasOption(SUBJECT_OPTION[0])) {
- patterns.add(MailProcessor.SUBJECT_PREFIX);
- patternOrder.put(MailOptions.SUBJECT, order += 1);
- }
- options.setStripQuotedText(hasOption(STRIP_QUOTED_OPTION[0]));
-
- options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
- options.setPatternOrder(patternOrder);
- options.setIncludeBody(hasOption(BODY_OPTION[0]));
-
- if (hasOption(SEPARATOR_OPTION[0])) {
- options.setSeparator(getOption(SEPARATOR_OPTION[0]));
- } else {
- options.setSeparator("\n");
- }
-
- if (hasOption(BODY_SEPARATOR_OPTION[0])) {
- options.setBodySeparator(getOption(BODY_SEPARATOR_OPTION[0]));
- }
-
- if (hasOption(QUOTED_REGEX_OPTION[0])) {
- options.setQuotedTextPattern(Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])));
- }
-
- if (getOption(DefaultOptionCreator.METHOD_OPTION,
- DefaultOptionCreator.MAPREDUCE_METHOD).equals(DefaultOptionCreator.SEQUENTIAL_METHOD)) {
- runSequential(options);
- } else {
- runMapReduce(getInputPath(), getOutputPath());
- }
-
- return 0;
- }
-
- private int runSequential(MailOptions options)
- throws IOException, InterruptedException, NoSuchMethodException {
-
- long start = System.currentTimeMillis();
- createSequenceFiles(options);
- long finish = System.currentTimeMillis();
- log.info("Conversion took {}ms", finish - start);
-
- return 0;
- }
-
- private int runMapReduce(Path input, Path output) throws IOException, InterruptedException, ClassNotFoundException {
-
- Job job = prepareJob(input, output, MultipleTextFileInputFormat.class, SequenceFilesFromMailArchivesMapper.class,
- Text.class, Text.class, SequenceFileOutputFormat.class, "SequentialFilesFromMailArchives");
-
- Configuration jobConfig = job.getConfiguration();
-
- if (hasOption(KEY_PREFIX_OPTION[0])) {
- jobConfig.set(KEY_PREFIX_OPTION[1], getOption(KEY_PREFIX_OPTION[0]));
- }
-
- int chunkSize = 0;
- if (hasOption(CHUNK_SIZE_OPTION[0])) {
- chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
- jobConfig.set(CHUNK_SIZE_OPTION[0], String.valueOf(chunkSize));
- }
-
- Charset charset;
- if (hasOption(CHARSET_OPTION[0])) {
- charset = Charset.forName(getOption(CHARSET_OPTION[0]));
- jobConfig.set(CHARSET_OPTION[0], charset.displayName());
- }
-
- if (hasOption(FROM_OPTION[0])) {
- jobConfig.set(FROM_OPTION[1], "true");
- }
-
- if (hasOption(TO_OPTION[0])) {
- jobConfig.set(TO_OPTION[1], "true");
- }
-
- if (hasOption(REFERENCES_OPTION[0])) {
- jobConfig.set(REFERENCES_OPTION[1], "true");
- }
-
- if (hasOption(SUBJECT_OPTION[0])) {
- jobConfig.set(SUBJECT_OPTION[1], "true");
- }
-
- if (hasOption(QUOTED_REGEX_OPTION[0])) {
- jobConfig.set(QUOTED_REGEX_OPTION[1], Pattern.compile(getOption(QUOTED_REGEX_OPTION[0])).toString());
- }
-
- if (hasOption(SEPARATOR_OPTION[0])) {
- jobConfig.set(SEPARATOR_OPTION[1], getOption(SEPARATOR_OPTION[0]));
- } else {
- jobConfig.set(SEPARATOR_OPTION[1], "\n");
- }
-
- if (hasOption(BODY_OPTION[0])) {
- jobConfig.set(BODY_OPTION[1], "true");
- } else {
- jobConfig.set(BODY_OPTION[1], "false");
- }
-
- if (hasOption(BODY_SEPARATOR_OPTION[0])) {
- jobConfig.set(BODY_SEPARATOR_OPTION[1], getOption(BODY_SEPARATOR_OPTION[0]));
- } else {
- jobConfig.set(BODY_SEPARATOR_OPTION[1], "\n");
- }
-
- FileSystem fs = FileSystem.get(jobConfig);
- FileStatus fsFileStatus = fs.getFileStatus(inputPath);
-
- jobConfig.set(BASE_INPUT_PATH, inputPath.toString());
- String inputDirList = HadoopUtil.buildDirList(fs, fsFileStatus);
- FileInputFormat.setInputPaths(job, inputDirList);
-
- long chunkSizeInBytes = chunkSize * 1024 * 1024;
- // need to set this to a multiple of the block size, or no split happens
- FileInputFormat.setMaxInputSplitSize(job, chunkSizeInBytes);
-
- // set the max split locations, otherwise we get nasty debug stuff
- jobConfig.set("mapreduce.job.max.split.locations", String.valueOf(MAX_JOB_SPLIT_LOCATIONS));
-
- boolean succeeded = job.waitForCompletion(true);
- if (!succeeded) {
- return -1;
- }
- return 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
deleted file mode 100644
index 203e8fb..0000000
--- a/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.mahout.common.HadoopUtil;
-import org.apache.mahout.common.iterator.FileLineIterable;
-import org.apache.mahout.utils.email.MailOptions;
-import org.apache.mahout.utils.email.MailProcessor;
-
-import java.io.ByteArrayInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.charset.Charset;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION;
-import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION;
-
-/**
- * Map Class for the SequenceFilesFromMailArchives job
- */
-public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
-
- private Text outKey = new Text();
- private Text outValue = new Text();
-
- private static final Pattern MESSAGE_START = Pattern.compile(
- "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
- private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
- "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
-
- private MailOptions options;
-
- @Override
- public void setup(Context context) throws IOException, InterruptedException {
-
- Configuration configuration = context.getConfiguration();
-
- // absorb all of the options into the MailOptions object
- this.options = new MailOptions();
-
- options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], ""));
-
- if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) {
- options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64));
- }
-
- if (!configuration.get(CHARSET_OPTION[0], "").equals("")) {
- Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8"));
- options.setCharset(charset);
- } else {
- Charset charset = Charset.forName("UTF-8");
- options.setCharset(charset);
- }
-
- List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
- // patternOrder is used downstream so that we can know what order the
- // text is in instead
- // of encoding it in the string, which
- // would require more processing later to remove it pre feature
- // selection.
- Map<String, Integer> patternOrder = Maps.newHashMap();
- int order = 0;
- if (!configuration.get(FROM_OPTION[1], "").equals("")) {
- patterns.add(MailProcessor.FROM_PREFIX);
- patternOrder.put(MailOptions.FROM, order++);
- }
-
- if (!configuration.get(TO_OPTION[1], "").equals("")) {
- patterns.add(MailProcessor.TO_PREFIX);
- patternOrder.put(MailOptions.TO, order++);
- }
-
- if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) {
- patterns.add(MailProcessor.REFS_PREFIX);
- patternOrder.put(MailOptions.REFS, order++);
- }
-
- if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) {
- patterns.add(MailProcessor.SUBJECT_PREFIX);
- patternOrder.put(MailOptions.SUBJECT, order += 1);
- }
-
- options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false));
-
- options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
- options.setPatternOrder(patternOrder);
-
- options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false));
-
- options.setSeparator("\n");
- if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) {
- options.setSeparator(configuration.get(SEPARATOR_OPTION[1], ""));
- }
- if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) {
- options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], ""));
- }
- if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) {
- options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], "")));
- }
-
- }
-
- public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context)
- throws IOException, InterruptedException {
- long messageCount = 0;
- try {
- StringBuilder contents = new StringBuilder();
- StringBuilder body = new StringBuilder();
- Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
- Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
- String[] patternResults = new String[options.getPatternsToMatch().length];
- Matcher[] matches = new Matcher[options.getPatternsToMatch().length];
- for (int i = 0; i < matches.length; i++) {
- matches[i] = options.getPatternsToMatch()[i].matcher("");
- }
-
- String messageId = null;
- boolean inBody = false;
- Pattern quotedTextPattern = options.getQuotedTextPattern();
-
- for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) {
- if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
- for (int i = 0; i < matches.length; i++) {
- Matcher matcher = matches[i];
- matcher.reset(nextLine);
- if (matcher.matches()) {
- patternResults[i] = matcher.group(1);
- }
- }
-
- // only start appending body content after we've seen a message ID
- if (messageId != null) {
- // first, see if we hit the end of the message
- messageBoundaryMatcher.reset(nextLine);
- if (messageBoundaryMatcher.matches()) {
- // done parsing this message ... write it out
- String key = generateKey(filename, options.getPrefix(), messageId);
- // if this ordering changes, then also change
- // FromEmailToDictionaryMapper
- writeContent(options.getSeparator(), contents, body, patternResults);
-
- this.outKey.set(key);
- this.outValue.set(contents.toString());
- context.write(this.outKey, this.outValue);
- contents.setLength(0); // reset the buffer
- body.setLength(0);
- messageId = null;
- inBody = false;
- } else {
- if (inBody && options.isIncludeBody()) {
- if (!nextLine.isEmpty()) {
- body.append(nextLine).append(options.getBodySeparator());
- }
- } else {
- // first empty line we see after reading the message Id
- // indicates that we are in the body ...
- inBody = nextLine.isEmpty();
- }
- }
- } else {
- if (nextLine.length() > 14) {
- messageIdMatcher.reset(nextLine);
- if (messageIdMatcher.matches()) {
- messageId = messageIdMatcher.group(1);
- ++messageCount;
- }
- }
- }
- }
- }
- // write the last message in the file if available
- if (messageId != null) {
- String key = generateKey(filename, options.getPrefix(), messageId);
- writeContent(options.getSeparator(), contents, body, patternResults);
- this.outKey.set(key);
- this.outValue.set(contents.toString());
- context.write(this.outKey, this.outValue);
- contents.setLength(0); // reset the buffer
- }
- } catch (FileNotFoundException ignored) {
-
- }
- return messageCount;
- }
-
- protected static String generateKey(String mboxFilename, String prefix, String messageId) {
- return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator());
- }
-
- private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
- String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator());
- contents.append(matchesString).append(separator).append(body);
- }
-
- public void map(IntWritable key, BytesWritable value, Context context)
- throws IOException, InterruptedException {
- Configuration configuration = context.getConfiguration();
- Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
- String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
- ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
- parseMailboxLineByLine(relativeFilePath, is, context);
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
deleted file mode 100644
index cacfd22..0000000
--- a/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.mahout.common.AbstractJob;
-
-import java.io.IOException;
-
-public class TextParagraphSplittingJob extends AbstractJob {
-
- @Override
- public int run(String[] strings) throws Exception {
- Configuration originalConf = getConf();
- Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")),
- new Path(originalConf.get("mapred.output.dir")),
- SequenceFileInputFormat.class,
- SplitMap.class,
- Text.class,
- Text.class,
- Reducer.class,
- Text.class,
- Text.class,
- SequenceFileOutputFormat.class);
- job.setNumReduceTasks(0);
- boolean succeeded = job.waitForCompletion(true);
- return succeeded ? 0 : -1;
- }
-
- public static class SplitMap extends Mapper<Text,Text,Text,Text> {
-
- @Override
- protected void map(Text key, Text text, Context context) throws IOException, InterruptedException {
- Text outText = new Text();
- int loc = 0;
- while (loc >= 0 && loc < text.getLength()) {
- int nextLoc = text.find("\n\n", loc + 1);
- if (nextLoc > 0) {
- outText.set(text.getBytes(), loc, nextLoc - loc);
- context.write(key, outText);
- }
- loc = nextLoc;
- }
- }
- }
-
- public static void main(String[] args) throws Exception {
- ToolRunner.run(new TextParagraphSplittingJob(), args);
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
deleted file mode 100644
index b8441b7..0000000
--- a/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.text;
-
-import java.io.IOException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-
-import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION;
-
-/**
- * RecordReader used with the MultipleTextFileInputFormat class to read full files as
- * k/v pairs and groups of files as single input splits.
- */
-public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> {
-
- private FileSplit fileSplit;
- private boolean processed = false;
- private Configuration configuration;
- private BytesWritable value = new BytesWritable();
- private IntWritable index;
- private String fileFilterClassName = null;
- private PathFilter pathFilter = null;
-
- public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
- throws IOException {
- this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
- fileSplit.getLength(idx), fileSplit.getLocations());
- this.configuration = taskAttemptContext.getConfiguration();
- this.index = new IntWritable(idx);
- this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]);
- }
-
- @Override
- public IntWritable getCurrentKey() {
- return index;
- }
-
- @Override
- public BytesWritable getCurrentValue() {
- return value;
- }
-
- @Override
- public float getProgress() throws IOException {
- return processed ? 1.0f : 0.0f;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- if (!StringUtils.isBlank(fileFilterClassName) &&
- !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
- try {
- pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- throw new IllegalStateException(e);
- }
- }
- }
-
- @Override
- public boolean nextKeyValue() throws IOException {
- if (!processed) {
- byte[] contents = new byte[(int) fileSplit.getLength()];
- Path file = fileSplit.getPath();
- FileSystem fs = file.getFileSystem(this.configuration);
-
- if (!fs.isFile(file)) {
- return false;
- }
-
- FileStatus[] fileStatuses;
- if (pathFilter != null) {
- fileStatuses = fs.listStatus(file, pathFilter);
- } else {
- fileStatuses = fs.listStatus(file);
- }
-
- if (fileStatuses.length == 1) {
- try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) {
- IOUtils.readFully(in, contents, 0, contents.length);
- value.setCapacity(contents.length);
- value.set(contents, 0, contents.length);
- }
- processed = true;
- return true;
- }
- }
- return false;
- }
-
- @Override
- public void close() throws IOException {
- }
-}
\ No newline at end of file