You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:52:13 UTC
[45/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045
Delete directories which were moved/no longer in use
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
new file mode 100644
index 0000000..203e8fb
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchivesMapper.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.email.MailOptions;
+import org.apache.mahout.utils.email.MailProcessor;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.BODY_SEPARATOR_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHARSET_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.CHUNK_SIZE_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.FROM_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.KEY_PREFIX_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.QUOTED_REGEX_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.REFERENCES_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.SEPARATOR_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.STRIP_QUOTED_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.SUBJECT_OPTION;
+import static org.apache.mahout.text.SequenceFilesFromMailArchives.TO_OPTION;
+
+/**
+ * Map Class for the SequenceFilesFromMailArchives job
+ */
+public class SequenceFilesFromMailArchivesMapper extends Mapper<IntWritable, BytesWritable, Text, Text> {
+
+ private Text outKey = new Text();
+ private Text outValue = new Text();
+
+ private static final Pattern MESSAGE_START = Pattern.compile(
+ "^From \\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
+ private static final Pattern MESSAGE_ID_PREFIX = Pattern.compile(
+ "^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
+
+ private MailOptions options;
+
+ @Override
+ public void setup(Context context) throws IOException, InterruptedException {
+
+ Configuration configuration = context.getConfiguration();
+
+ // absorb all of the options into the MailOptions object
+ this.options = new MailOptions();
+
+ options.setPrefix(configuration.get(KEY_PREFIX_OPTION[1], ""));
+
+ if (!configuration.get(CHUNK_SIZE_OPTION[0], "").equals("")) {
+ options.setChunkSize(configuration.getInt(CHUNK_SIZE_OPTION[0], 64));
+ }
+
+ if (!configuration.get(CHARSET_OPTION[0], "").equals("")) {
+ Charset charset = Charset.forName(configuration.get(CHARSET_OPTION[0], "UTF-8"));
+ options.setCharset(charset);
+ } else {
+ Charset charset = Charset.forName("UTF-8");
+ options.setCharset(charset);
+ }
+
+ List<Pattern> patterns = Lists.newArrayListWithCapacity(5);
+ // patternOrder is used downstream so that we can know what order the
+ // text is in instead
+ // of encoding it in the string, which
+ // would require more processing later to remove it pre feature
+ // selection.
+ Map<String, Integer> patternOrder = Maps.newHashMap();
+ int order = 0;
+ if (!configuration.get(FROM_OPTION[1], "").equals("")) {
+ patterns.add(MailProcessor.FROM_PREFIX);
+ patternOrder.put(MailOptions.FROM, order++);
+ }
+
+ if (!configuration.get(TO_OPTION[1], "").equals("")) {
+ patterns.add(MailProcessor.TO_PREFIX);
+ patternOrder.put(MailOptions.TO, order++);
+ }
+
+ if (!configuration.get(REFERENCES_OPTION[1], "").equals("")) {
+ patterns.add(MailProcessor.REFS_PREFIX);
+ patternOrder.put(MailOptions.REFS, order++);
+ }
+
+ if (!configuration.get(SUBJECT_OPTION[1], "").equals("")) {
+ patterns.add(MailProcessor.SUBJECT_PREFIX);
+ patternOrder.put(MailOptions.SUBJECT, order += 1);
+ }
+
+ options.setStripQuotedText(configuration.getBoolean(STRIP_QUOTED_OPTION[1], false));
+
+ options.setPatternsToMatch(patterns.toArray(new Pattern[patterns.size()]));
+ options.setPatternOrder(patternOrder);
+
+ options.setIncludeBody(configuration.getBoolean(BODY_OPTION[1], false));
+
+ options.setSeparator("\n");
+ if (!configuration.get(SEPARATOR_OPTION[1], "").equals("")) {
+ options.setSeparator(configuration.get(SEPARATOR_OPTION[1], ""));
+ }
+ if (!configuration.get(BODY_SEPARATOR_OPTION[1], "").equals("")) {
+ options.setBodySeparator(configuration.get(BODY_SEPARATOR_OPTION[1], ""));
+ }
+ if (!configuration.get(QUOTED_REGEX_OPTION[1], "").equals("")) {
+ options.setQuotedTextPattern(Pattern.compile(configuration.get(QUOTED_REGEX_OPTION[1], "")));
+ }
+
+ }
+
+ public long parseMailboxLineByLine(String filename, InputStream mailBoxInputStream, Context context)
+ throws IOException, InterruptedException {
+ long messageCount = 0;
+ try {
+ StringBuilder contents = new StringBuilder();
+ StringBuilder body = new StringBuilder();
+ Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
+ Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
+ String[] patternResults = new String[options.getPatternsToMatch().length];
+ Matcher[] matches = new Matcher[options.getPatternsToMatch().length];
+ for (int i = 0; i < matches.length; i++) {
+ matches[i] = options.getPatternsToMatch()[i].matcher("");
+ }
+
+ String messageId = null;
+ boolean inBody = false;
+ Pattern quotedTextPattern = options.getQuotedTextPattern();
+
+ for (String nextLine : new FileLineIterable(mailBoxInputStream, options.getCharset(), false, filename)) {
+ if (!options.isStripQuotedText() || !quotedTextPattern.matcher(nextLine).find()) {
+ for (int i = 0; i < matches.length; i++) {
+ Matcher matcher = matches[i];
+ matcher.reset(nextLine);
+ if (matcher.matches()) {
+ patternResults[i] = matcher.group(1);
+ }
+ }
+
+ // only start appending body content after we've seen a message ID
+ if (messageId != null) {
+ // first, see if we hit the end of the message
+ messageBoundaryMatcher.reset(nextLine);
+ if (messageBoundaryMatcher.matches()) {
+ // done parsing this message ... write it out
+ String key = generateKey(filename, options.getPrefix(), messageId);
+ // if this ordering changes, then also change
+ // FromEmailToDictionaryMapper
+ writeContent(options.getSeparator(), contents, body, patternResults);
+
+ this.outKey.set(key);
+ this.outValue.set(contents.toString());
+ context.write(this.outKey, this.outValue);
+ contents.setLength(0); // reset the buffer
+ body.setLength(0);
+ messageId = null;
+ inBody = false;
+ } else {
+ if (inBody && options.isIncludeBody()) {
+ if (!nextLine.isEmpty()) {
+ body.append(nextLine).append(options.getBodySeparator());
+ }
+ } else {
+ // first empty line we see after reading the message Id
+ // indicates that we are in the body ...
+ inBody = nextLine.isEmpty();
+ }
+ }
+ } else {
+ if (nextLine.length() > 14) {
+ messageIdMatcher.reset(nextLine);
+ if (messageIdMatcher.matches()) {
+ messageId = messageIdMatcher.group(1);
+ ++messageCount;
+ }
+ }
+ }
+ }
+ }
+ // write the last message in the file if available
+ if (messageId != null) {
+ String key = generateKey(filename, options.getPrefix(), messageId);
+ writeContent(options.getSeparator(), contents, body, patternResults);
+ this.outKey.set(key);
+ this.outValue.set(contents.toString());
+ context.write(this.outKey, this.outValue);
+ contents.setLength(0); // reset the buffer
+ }
+ } catch (FileNotFoundException ignored) {
+
+ }
+ return messageCount;
+ }
+
+ protected static String generateKey(String mboxFilename, String prefix, String messageId) {
+ return Joiner.on(Path.SEPARATOR).join(Lists.newArrayList(prefix, mboxFilename, messageId).iterator());
+ }
+
+ private static void writeContent(String separator, StringBuilder contents, CharSequence body, String[] matches) {
+ String matchesString = Joiner.on(separator).useForNull("").join(Arrays.asList(matches).iterator());
+ contents.append(matchesString).append(separator).append(body);
+ }
+
+ public void map(IntWritable key, BytesWritable value, Context context)
+ throws IOException, InterruptedException {
+ Configuration configuration = context.getConfiguration();
+ Path filePath = ((CombineFileSplit) context.getInputSplit()).getPath(key.get());
+ String relativeFilePath = HadoopUtil.calcRelativeFilePath(configuration, filePath);
+ ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
+ parseMailboxLineByLine(relativeFilePath, is, context);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
new file mode 100644
index 0000000..cacfd22
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+
+import java.io.IOException;
+
+public class TextParagraphSplittingJob extends AbstractJob {
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ Configuration originalConf = getConf();
+ Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")),
+ new Path(originalConf.get("mapred.output.dir")),
+ SequenceFileInputFormat.class,
+ SplitMap.class,
+ Text.class,
+ Text.class,
+ Reducer.class,
+ Text.class,
+ Text.class,
+ SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(0);
+ boolean succeeded = job.waitForCompletion(true);
+ return succeeded ? 0 : -1;
+ }
+
+ public static class SplitMap extends Mapper<Text,Text,Text,Text> {
+
+ @Override
+ protected void map(Text key, Text text, Context context) throws IOException, InterruptedException {
+ Text outText = new Text();
+ int loc = 0;
+ while (loc >= 0 && loc < text.getLength()) {
+ int nextLoc = text.find("\n\n", loc + 1);
+ if (nextLoc > 0) {
+ outText.set(text.getBytes(), loc, nextLoc - loc);
+ context.write(key, outText);
+ }
+ loc = nextLoc;
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new TextParagraphSplittingJob(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
new file mode 100644
index 0000000..b8441b7
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WholeFileRecordReader.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+import static org.apache.mahout.text.SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION;
+
+/**
+ * RecordReader used with the MultipleTextFileInputFormat class to read full files as
+ * k/v pairs and groups of files as single input splits.
+ */
+public class WholeFileRecordReader extends RecordReader<IntWritable, BytesWritable> {
+
+ private FileSplit fileSplit;
+ private boolean processed = false;
+ private Configuration configuration;
+ private BytesWritable value = new BytesWritable();
+ private IntWritable index;
+ private String fileFilterClassName = null;
+ private PathFilter pathFilter = null;
+
+ public WholeFileRecordReader(CombineFileSplit fileSplit, TaskAttemptContext taskAttemptContext, Integer idx)
+ throws IOException {
+ this.fileSplit = new FileSplit(fileSplit.getPath(idx), fileSplit.getOffset(idx),
+ fileSplit.getLength(idx), fileSplit.getLocations());
+ this.configuration = taskAttemptContext.getConfiguration();
+ this.index = new IntWritable(idx);
+ this.fileFilterClassName = this.configuration.get(FILE_FILTER_CLASS_OPTION[0]);
+ }
+
+ @Override
+ public IntWritable getCurrentKey() {
+ return index;
+ }
+
+ @Override
+ public BytesWritable getCurrentValue() {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return processed ? 1.0f : 0.0f;
+ }
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ if (!StringUtils.isBlank(fileFilterClassName) &&
+ !PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
+ try {
+ pathFilter = (PathFilter) Class.forName(fileFilterClassName).newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException {
+ if (!processed) {
+ byte[] contents = new byte[(int) fileSplit.getLength()];
+ Path file = fileSplit.getPath();
+ FileSystem fs = file.getFileSystem(this.configuration);
+
+ if (!fs.isFile(file)) {
+ return false;
+ }
+
+ FileStatus[] fileStatuses;
+ if (pathFilter != null) {
+ fileStatuses = fs.listStatus(file, pathFilter);
+ } else {
+ fileStatuses = fs.listStatus(file);
+ }
+
+ if (fileStatuses.length == 1) {
+ try (FSDataInputStream in = fs.open(fileStatuses[0].getPath())) {
+ IOUtils.readFully(in, contents, 0, contents.length);
+ value.setCapacity(contents.length);
+ value.set(contents, 0, contents.length);
+ }
+ processed = true;
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
new file mode 100644
index 0000000..bed4640
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.text.wikipedia.WikipediaMapper;
+import org.apache.mahout.text.wikipedia.XmlInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaToSequenceFile {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaToSequenceFile.class);
+
+ private WikipediaToSequenceFile() { }
+
+ /**
+ * Takes in two arguments:
+ * <ol>
+ * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+ * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+ * {@link org.apache.hadoop.io.SequenceFile}</li>
+ * </ol>
+ */
+ public static void main(String[] args) throws IOException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+
+ Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+
+ Option categoriesOpt = obuilder.withLongName("categories").withArgument(
+ abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Location of the categories file. One entry per line. "
+ + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+
+ Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+ "If set, then the category name must exactly match the "
+ + "entry in the categories file. Default is false").withShortName("e").create();
+
+ Option allOpt = obuilder.withLongName("all")
+ .withDescription("If set, Select all files. Default is false").withShortName("all").create();
+
+ Option removeLabelOpt = obuilder.withLongName("removeLabels")
+ .withDescription("If set, remove [[Category:labels]] from document text after extracting label."
+ + "Default is false").withShortName("rl").create();
+
+ Option helpOpt = DefaultOptionCreator.helpOption();
+
+ Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+ .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(allOpt).withOption(helpOpt)
+ .withOption(removeLabelOpt).create();
+
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ parser.setHelpOption(helpOpt);
+ try {
+ CommandLine cmdLine = parser.parse(args);
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+ String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+
+ String catFile = "";
+ if (cmdLine.hasOption(categoriesOpt)) {
+ catFile = (String) cmdLine.getValue(categoriesOpt);
+ }
+
+ boolean all = false;
+ if (cmdLine.hasOption(allOpt)) {
+ all = true;
+ }
+
+ boolean removeLabels = false;
+ if (cmdLine.hasOption(removeLabelOpt)) {
+ removeLabels = true;
+ }
+
+ runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), all, removeLabels);
+ } catch (OptionException | InterruptedException | ClassNotFoundException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ }
+ }
+
+ /**
+ * Run the job
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
+ * @param catFile
+ * the file containing the Wikipedia categories
+ * @param exactMatchOnly
+ * if true, then the Wikipedia category must match exactly instead of simply containing the
+ * category string
+ * @param all
+ * if true select all categories
+ * @param removeLabels
+ * if true remove Category labels from document text after extracting.
+ *
+ */
+ public static void runJob(String input,
+ String output,
+ String catFile,
+ boolean exactMatchOnly,
+ boolean all,
+ boolean removeLabels) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
+ conf.set("xmlinput.start", "<page>");
+ conf.set("xmlinput.end", "</page>");
+ conf.setBoolean("exact.match.only", exactMatchOnly);
+ conf.setBoolean("all.files", all);
+ conf.setBoolean("remove.labels", removeLabels);
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+ Set<String> categories = new HashSet<>();
+ if (!catFile.isEmpty()) {
+ for (String line : new FileLineIterable(new File(catFile))) {
+ categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+ }
+ }
+
+ Stringifier<Set<String>> setStringifier =
+ new DefaultStringifier<>(conf, GenericsUtil.getClass(categories));
+
+ String categoriesStr = setStringifier.toString(categories);
+ conf.set("wikipedia.categories", categoriesStr);
+
+ Job job = new Job(conf);
+ log.info("Input: {} Out: {} Categories: {} All Files: {}", input, output, catFile, all);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ FileInputFormat.setInputPaths(job, new Path(input));
+ Path outPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outPath);
+ job.setMapperClass(WikipediaMapper.class);
+ job.setInputFormatClass(XmlInputFormat.class);
+ job.setReducerClass(Reducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setJarByClass(WikipediaToSequenceFile.class);
+
+ /*
+ * conf.set("mapred.compress.map.output", "true"); conf.set("mapred.map.output.compression.type",
+ * "BLOCK"); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.type",
+ * "BLOCK"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+ */
+ HadoopUtil.delete(conf, outPath);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java
new file mode 100644
index 0000000..d50323d
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaAnalyzer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.Tokenizer;
+import org.apache.lucene.analysis.core.LowerCaseFilter;
+import org.apache.lucene.analysis.core.StopAnalyzer;
+import org.apache.lucene.analysis.core.StopFilter;
+import org.apache.lucene.analysis.standard.StandardFilter;
+import org.apache.lucene.analysis.util.CharArraySet;
+import org.apache.lucene.analysis.util.StopwordAnalyzerBase;
+import org.apache.lucene.analysis.wikipedia.WikipediaTokenizer;
+
+
+public class WikipediaAnalyzer extends StopwordAnalyzerBase {
+
+ public WikipediaAnalyzer() {
+ super(StopAnalyzer.ENGLISH_STOP_WORDS_SET);
+ }
+
+ public WikipediaAnalyzer(CharArraySet stopSet) {
+ super(stopSet);
+ }
+
+ @Override
+ protected TokenStreamComponents createComponents(String fieldName) {
+ Tokenizer tokenizer = new WikipediaTokenizer();
+ TokenStream result = new StandardFilter(tokenizer);
+ result = new LowerCaseFilter(result);
+ result = new StopFilter(result, getStopwordSet());
+ return new TokenStreamComponents(tokenizer, result);
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
new file mode 100644
index 0000000..8214407
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Stringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaDatasetCreatorDriver {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorDriver.class);
+
+ private WikipediaDatasetCreatorDriver() { }
+
+ /**
+ * Takes in two arguments:
+ * <ol>
+ * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+ * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+ * {@link org.apache.hadoop.io.SequenceFile}</li>
+ * </ol>
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+
+ Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+
+ Option categoriesOpt = obuilder.withLongName("categories").withRequired(true).withArgument(
+ abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Location of the categories file. One entry per line. "
+ + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+
+ Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+ "If set, then the category name must exactly match the "
+ + "entry in the categories file. Default is false").withShortName("e").create();
+ Option analyzerOpt = obuilder.withLongName("analyzer").withRequired(false).withArgument(
+ abuilder.withName("analyzer").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The analyzer to use, must have a no argument constructor").withShortName("a").create();
+ Option helpOpt = DefaultOptionCreator.helpOption();
+
+ Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+ .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(analyzerOpt).withOption(helpOpt)
+ .create();
+
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ try {
+ CommandLine cmdLine = parser.parse(args);
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+ String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+ String catFile = (String) cmdLine.getValue(categoriesOpt);
+ Class<? extends Analyzer> analyzerClass = WikipediaAnalyzer.class;
+ if (cmdLine.hasOption(analyzerOpt)) {
+ String className = cmdLine.getValue(analyzerOpt).toString();
+ analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+ // try instantiating it, b/c there isn't any point in setting it if
+ // you can't instantiate it
+ ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+ }
+ runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt),
+ analyzerClass);
+ } catch (OptionException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ } catch (ClassNotFoundException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ }
+ }
+
+ /**
+ * Run the job
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
+ * @param catFile
+ * the file containing the Wikipedia categories
+ * @param exactMatchOnly
+ * if true, then the Wikipedia category must match exactly instead of simply containing the
+ * category string
+ */
+ public static void runJob(String input,
+ String output,
+ String catFile,
+ boolean exactMatchOnly,
+ Class<? extends Analyzer> analyzerClass)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
+ conf.set("key.value.separator.in.input.line", " ");
+ conf.set("xmlinput.start", "<page>");
+ conf.set("xmlinput.end", "</page>");
+ conf.setBoolean("exact.match.only", exactMatchOnly);
+ conf.set("analyzer.class", analyzerClass.getName());
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ // Dont ever forget this. People should keep track of how hadoop conf
+ // parameters can make or break a piece of code
+
+ Set<String> categories = new HashSet<>();
+ for (String line : new FileLineIterable(new File(catFile))) {
+ categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+ }
+
+ Stringifier<Set<String>> setStringifier =
+ new DefaultStringifier<>(conf, GenericsUtil.getClass(categories));
+
+ String categoriesStr = setStringifier.toString(categories);
+
+ conf.set("wikipedia.categories", categoriesStr);
+
+ Job job = new Job(conf);
+ log.info("Input: {} Out: {} Categories: {}", input, output, catFile);
+ job.setJarByClass(WikipediaDatasetCreatorDriver.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setMapperClass(WikipediaDatasetCreatorMapper.class);
+ //TODO: job.setNumMapTasks(100);
+ job.setInputFormatClass(XmlInputFormat.class);
+ job.setReducerClass(WikipediaDatasetCreatorReducer.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path(input));
+ Path outPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outPath);
+ HadoopUtil.delete(conf, outPath);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
new file mode 100644
index 0000000..50e5f37
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.common.ClassUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ *
+ */
+public class WikipediaDatasetCreatorMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorMapper.class);
+
+ private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s\\W]");
+ private static final Pattern OPEN_TEXT_TAG_PATTERN = Pattern.compile("<text xml:space=\"preserve\">");
+ private static final Pattern CLOSE_TEXT_TAG_PATTERN = Pattern.compile("</text>");
+
+ private List<String> inputCategories;
+ private List<Pattern> inputCategoryPatterns;
+ private boolean exactMatchOnly;
+ private Analyzer analyzer;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String document = value.toString();
+ document = StringEscapeUtils.unescapeHtml4(CLOSE_TEXT_TAG_PATTERN.matcher(
+ OPEN_TEXT_TAG_PATTERN.matcher(document).replaceFirst("")).replaceAll(""));
+ String catMatch = findMatchingCategory(document);
+ if (!"Unknown".equals(catMatch)) {
+ StringBuilder contents = new StringBuilder(1000);
+ TokenStream stream = analyzer.tokenStream(catMatch, new StringReader(document));
+ CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);
+ stream.reset();
+ while (stream.incrementToken()) {
+ contents.append(termAtt.buffer(), 0, termAtt.length()).append(' ');
+ }
+ context.write(
+ new Text(SPACE_NON_ALPHA_PATTERN.matcher(catMatch).replaceAll("_")),
+ new Text(contents.toString()));
+ stream.end();
+ Closeables.close(stream, true);
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ if (inputCategories == null) {
+ Set<String> newCategories = new HashSet<>();
+ DefaultStringifier<Set<String>> setStringifier =
+ new DefaultStringifier<>(conf, GenericsUtil.getClass(newCategories));
+ String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories));
+ Set<String> inputCategoriesSet = setStringifier.fromString(categoriesStr);
+ inputCategories = new ArrayList<>(inputCategoriesSet);
+ inputCategoryPatterns = new ArrayList<>(inputCategories.size());
+ for (String inputCategory : inputCategories) {
+ inputCategoryPatterns.add(Pattern.compile(".*\\b" + inputCategory + "\\b.*"));
+ }
+
+ }
+
+ exactMatchOnly = conf.getBoolean("exact.match.only", false);
+
+ if (analyzer == null) {
+ String analyzerStr = conf.get("analyzer.class", WikipediaAnalyzer.class.getName());
+ analyzer = ClassUtils.instantiateAs(analyzerStr, Analyzer.class);
+ }
+
+ log.info("Configure: Input Categories size: {} Exact Match: {} Analyzer: {}",
+ inputCategories.size(), exactMatchOnly, analyzer.getClass().getName());
+ }
+
+ private String findMatchingCategory(String document) {
+ int startIndex = 0;
+ int categoryIndex;
+ while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+ categoryIndex += 11;
+ int endIndex = document.indexOf("]]", categoryIndex);
+ if (endIndex >= document.length() || endIndex < 0) {
+ break;
+ }
+ String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+ // categories.add(category.toLowerCase());
+ if (exactMatchOnly && inputCategories.contains(category)) {
+ return category;
+ }
+ if (!exactMatchOnly) {
+ for (int i = 0; i < inputCategories.size(); i++) {
+ String inputCategory = inputCategories.get(i);
+ Pattern inputCategoryPattern = inputCategoryPatterns.get(i);
+ if (inputCategoryPattern.matcher(category).matches()) { // inexact match with word boundary.
+ return inputCategory;
+ }
+ }
+ }
+ startIndex = endIndex;
+ }
+ return "Unknown";
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
new file mode 100644
index 0000000..bf921fc
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Can also be used as a local Combiner
+ */
+public class WikipediaDatasetCreatorReducer extends Reducer<Text, Text, Text, Text> {
+
+ @Override
+ protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ // Key is label,word, value is the number of times we've seen this label
+ // word per local node. Output is the same
+ for (Text value : values) {
+ context.write(key, value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
new file mode 100644
index 0000000..abd3a04
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ *
+ */
+public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaMapper.class);
+
+ private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s]");
+
+ private static final String START_DOC = "<text xml:space=\"preserve\">";
+
+ private static final String END_DOC = "</text>";
+
+ private static final Pattern TITLE = Pattern.compile("<title>(.*)<\\/title>");
+
+ private static final String REDIRECT = "<redirect />";
+
+ private Set<String> inputCategories;
+
+ private boolean exactMatchOnly;
+
+ private boolean all;
+
+ private boolean removeLabels;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+
+ String content = value.toString();
+ if (content.contains(REDIRECT)) {
+ return;
+ }
+ String document;
+ String title;
+ try {
+ document = getDocument(content);
+ title = getTitle(content);
+ } catch (RuntimeException e) {
+ // TODO: reporter.getCounter("Wikipedia", "Parse errors").increment(1);
+ return;
+ }
+
+ String catMatch = findMatchingCategory(document);
+ if (!all) {
+ if ("Unknown".equals(catMatch)) {
+ return;
+ }
+ }
+
+ document = StringEscapeUtils.unescapeHtml4(document);
+ if (removeLabels) {
+ document = removeCategoriesFromText(document);
+ // Reject documents with malformed tags
+ if (document == null) {
+ return;
+ }
+ }
+
+ // write out in Bayes input style: key: /Category/document_name
+ String category = "/" + catMatch.toLowerCase(Locale.ENGLISH) + "/" +
+ SPACE_NON_ALPHA_PATTERN.matcher(title).replaceAll("_");
+
+ context.write(new Text(category), new Text(document));
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+
+ Set<String> newCategories = new HashSet<>();
+ DefaultStringifier<Set<String>> setStringifier =
+ new DefaultStringifier<>(conf, GenericsUtil.getClass(newCategories));
+
+ String categoriesStr = conf.get("wikipedia.categories");
+ inputCategories = setStringifier.fromString(categoriesStr);
+ exactMatchOnly = conf.getBoolean("exact.match.only", false);
+ all = conf.getBoolean("all.files", false);
+ removeLabels = conf.getBoolean("remove.labels",false);
+ log.info("Configure: Input Categories size: {} All: {} Exact Match: {} Remove Labels from Text: {}",
+ inputCategories.size(), all, exactMatchOnly, removeLabels);
+ }
+
+ private static String getDocument(String xml) {
+ int start = xml.indexOf(START_DOC) + START_DOC.length();
+ int end = xml.indexOf(END_DOC, start);
+ return xml.substring(start, end);
+ }
+
+ private static String getTitle(CharSequence xml) {
+ Matcher m = TITLE.matcher(xml);
+ return m.find() ? m.group(1) : "";
+ }
+
+ private String findMatchingCategory(String document) {
+ int startIndex = 0;
+ int categoryIndex;
+ while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+ categoryIndex += 11;
+ int endIndex = document.indexOf("]]", categoryIndex);
+ if (endIndex >= document.length() || endIndex < 0) {
+ break;
+ }
+ String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+ if (exactMatchOnly && inputCategories.contains(category)) {
+ return category.toLowerCase(Locale.ENGLISH);
+ }
+ if (!exactMatchOnly) {
+ for (String inputCategory : inputCategories) {
+ if (category.contains(inputCategory)) { // we have an inexact match
+ return inputCategory.toLowerCase(Locale.ENGLISH);
+ }
+ }
+ }
+ startIndex = endIndex;
+ }
+ return "Unknown";
+ }
+
+ private String removeCategoriesFromText(String document) {
+ int startIndex = 0;
+ int categoryIndex;
+ try {
+ while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+ int endIndex = document.indexOf("]]", categoryIndex);
+ if (endIndex >= document.length() || endIndex < 0) {
+ break;
+ }
+ document = document.replace(document.substring(categoryIndex, endIndex + 2), "");
+ if (categoryIndex < document.length()) {
+ startIndex = categoryIndex;
+ } else {
+ break;
+ }
+ }
+ } catch(StringIndexOutOfBoundsException e) {
+ return null;
+ }
+ return document;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
new file mode 100644
index 0000000..fc065fe
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>The Bayes example package provides some helper classes for training the Naive Bayes classifier
+ * on the Twenty Newsgroups data. See {@code PrepareTwentyNewsgroups}
+ * for details on running the trainer and
+ * formatting the Twenty Newsgroups data properly for the training.</p>
+ *
+ * <p>The easiest way to prepare the data is to use the ant task in core/build.xml:</p>
+ *
+ * <p>{@code ant extract-20news-18828}</p>
+ *
+ * <p>This runs the arg line:</p>
+ *
+ * <p>{@code -p $\{working.dir\}/20news-18828/ -o $\{working.dir\}/20news-18828-collapse -a $\{analyzer\} -c UTF-8}</p>
+ *
+ * <p>To Run the Wikipedia examples (assumes you've built the Mahout Job jar):</p>
+ *
+ * <ol>
+ * <li>Download the Wikipedia Dataset. Use the Ant target: {@code ant enwiki-files}</li>
+ * <li>Chunk the data using the WikipediaXmlSplitter (from the Hadoop home):
+ * {@code bin/hadoop jar $MAHOUT_HOME/target/mahout-examples-0.x
+ * org.apache.mahout.classifier.bayes.WikipediaXmlSplitter
+ * -d $MAHOUT_HOME/examples/temp/enwiki-latest-pages-articles.xml
+ * -o $MAHOUT_HOME/examples/work/wikipedia/chunks/ -c 64}</li>
+ * </ol>
+ */
+public final class WikipediaXmlSplitter {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaXmlSplitter.class);
+
+ private WikipediaXmlSplitter() { }
+
+ public static void main(String[] args) throws IOException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option dumpFileOpt = obuilder.withLongName("dumpFile").withRequired(true).withArgument(
+ abuilder.withName("dumpFile").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The path to the wikipedia dump file (.bz2 or uncompressed)").withShortName("d").create();
+
+ Option outputDirOpt = obuilder.withLongName("outputDir").withRequired(true).withArgument(
+ abuilder.withName("outputDir").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The output directory to place the splits in:\n"
+ + "local files:\n\t/var/data/wikipedia-xml-chunks or\n\tfile:///var/data/wikipedia-xml-chunks\n"
+ + "Hadoop DFS:\n\thdfs://wikipedia-xml-chunks\n"
+ + "AWS S3 (blocks):\n\ts3://bucket-name/wikipedia-xml-chunks\n"
+ + "AWS S3 (native files):\n\ts3n://bucket-name/wikipedia-xml-chunks\n")
+
+ .withShortName("o").create();
+
+ Option s3IdOpt = obuilder.withLongName("s3ID").withRequired(false).withArgument(
+ abuilder.withName("s3Id").withMinimum(1).withMaximum(1).create()).withDescription("Amazon S3 ID key")
+ .withShortName("i").create();
+ Option s3SecretOpt = obuilder.withLongName("s3Secret").withRequired(false).withArgument(
+ abuilder.withName("s3Secret").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Amazon S3 secret key").withShortName("s").create();
+
+ Option chunkSizeOpt = obuilder.withLongName("chunkSize").withRequired(true).withArgument(
+ abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Size of the chunk, in megabytes").withShortName("c").create();
+ Option numChunksOpt = obuilder
+ .withLongName("numChunks")
+ .withRequired(false)
+ .withArgument(abuilder.withName("numChunks").withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "The maximum number of chunks to create. If specified, program will only create a subset of the chunks")
+ .withShortName("n").create();
+ Group group = gbuilder.withName("Options").withOption(dumpFileOpt).withOption(outputDirOpt).withOption(
+ chunkSizeOpt).withOption(numChunksOpt).withOption(s3IdOpt).withOption(s3SecretOpt).create();
+
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ CommandLine cmdLine;
+ try {
+ cmdLine = parser.parse(args);
+ } catch (OptionException e) {
+ log.error("Error while parsing options", e);
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ Configuration conf = new Configuration();
+ String dumpFilePath = (String) cmdLine.getValue(dumpFileOpt);
+ String outputDirPath = (String) cmdLine.getValue(outputDirOpt);
+
+ if (cmdLine.hasOption(s3IdOpt)) {
+ String id = (String) cmdLine.getValue(s3IdOpt);
+ conf.set("fs.s3n.awsAccessKeyId", id);
+ conf.set("fs.s3.awsAccessKeyId", id);
+ }
+ if (cmdLine.hasOption(s3SecretOpt)) {
+ String secret = (String) cmdLine.getValue(s3SecretOpt);
+ conf.set("fs.s3n.awsSecretAccessKey", secret);
+ conf.set("fs.s3.awsSecretAccessKey", secret);
+ }
+ // do not compute crc file when using local FS
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+ FileSystem fs = FileSystem.get(URI.create(outputDirPath), conf);
+
+ int chunkSize = 1024 * 1024 * Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
+
+ int numChunks = Integer.MAX_VALUE;
+ if (cmdLine.hasOption(numChunksOpt)) {
+ numChunks = Integer.parseInt((String) cmdLine.getValue(numChunksOpt));
+ }
+
+ String header = "<mediawiki xmlns=\"http://www.mediawiki.org/xml/export-0.3/\" "
+ + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" "
+ + "xsi:schemaLocation=\"http://www.mediawiki.org/xml/export-0.3/ "
+ + "http://www.mediawiki.org/xml/export-0.3.xsd\" " + "version=\"0.3\" "
+ + "xml:lang=\"en\">\n" + " <siteinfo>\n" + "<sitename>Wikipedia</sitename>\n"
+ + " <base>http://en.wikipedia.org/wiki/Main_Page</base>\n"
+ + " <generator>MediaWiki 1.13alpha</generator>\n" + " <case>first-letter</case>\n"
+ + " <namespaces>\n" + " <namespace key=\"-2\">Media</namespace>\n"
+ + " <namespace key=\"-1\">Special</namespace>\n" + " <namespace key=\"0\" />\n"
+ + " <namespace key=\"1\">Talk</namespace>\n"
+ + " <namespace key=\"2\">User</namespace>\n"
+ + " <namespace key=\"3\">User talk</namespace>\n"
+ + " <namespace key=\"4\">Wikipedia</namespace>\n"
+ + " <namespace key=\"5\">Wikipedia talk</namespace>\n"
+ + " <namespace key=\"6\">Image</namespace>\n"
+ + " <namespace key=\"7\">Image talk</namespace>\n"
+ + " <namespace key=\"8\">MediaWiki</namespace>\n"
+ + " <namespace key=\"9\">MediaWiki talk</namespace>\n"
+ + " <namespace key=\"10\">Template</namespace>\n"
+ + " <namespace key=\"11\">Template talk</namespace>\n"
+ + " <namespace key=\"12\">Help</namespace>\n"
+ + " <namespace key=\"13\">Help talk</namespace>\n"
+ + " <namespace key=\"14\">Category</namespace>\n"
+ + " <namespace key=\"15\">Category talk</namespace>\n"
+ + " <namespace key=\"100\">Portal</namespace>\n"
+ + " <namespace key=\"101\">Portal talk</namespace>\n" + " </namespaces>\n"
+ + " </siteinfo>\n";
+
+ StringBuilder content = new StringBuilder();
+ content.append(header);
+ NumberFormat decimalFormatter = new DecimalFormat("0000");
+ File dumpFile = new File(dumpFilePath);
+
+ // If the specified path for the input file is incorrect, return immediately
+ if (!dumpFile.exists()) {
+ log.error("Input file path {} doesn't exist", dumpFilePath);
+ return;
+ }
+
+ FileLineIterator it;
+ if (dumpFilePath.endsWith(".bz2")) {
+ // default compression format from http://download.wikimedia.org
+ CompressionCodec codec = new BZip2Codec();
+ it = new FileLineIterator(codec.createInputStream(new FileInputStream(dumpFile)));
+ } else {
+ // assume the user has previously de-compressed the dump file
+ it = new FileLineIterator(dumpFile);
+ }
+ int fileNumber = 0;
+ while (it.hasNext()) {
+ String thisLine = it.next();
+ if (thisLine.trim().startsWith("<page>")) {
+ boolean end = false;
+ while (!thisLine.trim().startsWith("</page>")) {
+ content.append(thisLine).append('\n');
+ if (it.hasNext()) {
+ thisLine = it.next();
+ } else {
+ end = true;
+ break;
+ }
+ }
+ content.append(thisLine).append('\n');
+
+ if (content.length() > chunkSize || end) {
+ content.append("</mediawiki>");
+ fileNumber++;
+ String filename = outputDirPath + "/chunk-" + decimalFormatter.format(fileNumber) + ".xml";
+ try (BufferedWriter chunkWriter =
+ new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filename)), "UTF-8"))) {
+ chunkWriter.write(content.toString(), 0, content.length());
+ }
+ if (fileNumber >= numChunks) {
+ break;
+ }
+ content = new StringBuilder();
+ content.append(header);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
new file mode 100644
index 0000000..afd350f
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlInputFormat extends TextInputFormat {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);
+
+ public static final String START_TAG_KEY = "xmlinput.start";
+ public static final String END_TAG_KEY = "xmlinput.end";
+
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+ try {
+ return new XmlRecordReader((FileSplit) split, context.getConfiguration());
+ } catch (IOException ioe) {
+ log.warn("Error while creating XmlRecordReader", ioe);
+ return null;
+ }
+ }
+
+ /**
+ * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
+ * by the start tag and end tag
+ *
+ */
+ public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
+
+ private final byte[] startTag;
+ private final byte[] endTag;
+ private final long start;
+ private final long end;
+ private final FSDataInputStream fsin;
+ private final DataOutputBuffer buffer = new DataOutputBuffer();
+ private LongWritable currentKey;
+ private Text currentValue;
+
+ public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
+ startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8);
+ endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8);
+
+ // open the file and seek to the start of the split
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ fsin = fs.open(split.getPath());
+ fsin.seek(start);
+ }
+
+ private boolean next(LongWritable key, Text value) throws IOException {
+ if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
+ try {
+ buffer.write(startTag);
+ if (readUntilMatch(endTag, true)) {
+ key.set(fsin.getPos());
+ value.set(buffer.getData(), 0, buffer.getLength());
+ return true;
+ }
+ } finally {
+ buffer.reset();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.close(fsin, true);
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return (fsin.getPos() - start) / (float) (end - start);
+ }
+
+ private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
+ int i = 0;
+ while (true) {
+ int b = fsin.read();
+ // end of file:
+ if (b == -1) {
+ return false;
+ }
+ // save to buffer:
+ if (withinBlock) {
+ buffer.write(b);
+ }
+
+ // check if we're matching:
+ if (b == match[i]) {
+ i++;
+ if (i >= match.length) {
+ return true;
+ }
+ } else {
+ i = 0;
+ }
+ // see if we've passed the stop point:
+ if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ currentKey = new LongWritable();
+ currentValue = new Text();
+ return next(currentKey, currentValue);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java
new file mode 100644
index 0000000..1c55090
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/Bump125.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+/**
+ * Helps with making nice intervals at arbitrary scale.
+ *
+ * One use case is where we are producing progress or error messages every time an incoming
+ * record is received. It is generally bad form to produce a message for <i>every</i> input
+ * so it would be better to produce a message for each of the first 10 records, then every
+ * other record up to 20 and then every 5 records up to 50 and then every 10 records up to 100,
+ * more or less. The pattern can now repeat scaled up by 100. The total number of messages will scale
+ * with the log of the number of input lines which is much more survivable than direct output
+ * and because early records all get messages, we get indications early.
+ */
+public class Bump125 {
+ private static final int[] BUMPS = {1, 2, 5};
+
+ static int scale(double value, double base) {
+ double scale = value / base;
+ // scan for correct step
+ int i = 0;
+ while (i < BUMPS.length - 1 && BUMPS[i + 1] <= scale) {
+ i++;
+ }
+ return BUMPS[i];
+ }
+
+ static long base(double value) {
+ return Math.max(1, (long) Math.pow(10, (int) Math.floor(Math.log10(value))));
+ }
+
+ private long counter = 0;
+
+ public long increment() {
+ long delta;
+ if (counter >= 10) {
+ long base = base(counter / 4.0);
+ int scale = scale(counter / 4.0, base);
+ delta = base * scale;
+ } else {
+ delta = 1;
+ }
+ counter += delta;
+ return counter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java
new file mode 100644
index 0000000..f63de83
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/MatrixDumper.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.MatrixWritable;
+
+/**
+ * Export a Matrix in various text formats:
+ * * CSV file
+ *
+ * Input format: Hadoop SequenceFile with Text key and MatrixWritable value, 1 pair
+ * TODO:
+ * Needs class for key value- should not hard-code to Text.
+ * Options for row and column headers- stats software can be picky.
+ * Assumes only one matrix in a file.
+ */
+public final class MatrixDumper extends AbstractJob {
+
+ private MatrixDumper() { }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new MatrixDumper(), args);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOption("output", "o", "Output path", null); // AbstractJob output feature requires param
+ Map<String, List<String>> parsedArgs = parseArguments(args);
+ if (parsedArgs == null) {
+ return -1;
+ }
+ String outputFile = hasOption("output") ? getOption("output") : null;
+ exportCSV(getInputPath(), outputFile, false);
+ return 0;
+ }
+
+ private static void exportCSV(Path inputPath, String outputFile, boolean doLabels) throws IOException {
+ SequenceFileValueIterator<MatrixWritable> it =
+ new SequenceFileValueIterator<>(inputPath, true, new Configuration());
+ Matrix m = it.next().get();
+ it.close();
+ PrintStream ps = getPrintStream(outputFile);
+ String[] columnLabels = getLabels(m.numCols(), m.getColumnLabelBindings(), "col");
+ String[] rowLabels = getLabels(m.numRows(), m.getRowLabelBindings(), "row");
+ if (doLabels) {
+ ps.print("rowid,");
+ ps.print(columnLabels[0]);
+ for (int c = 1; c < m.numCols(); c++) {
+ ps.print(',' + columnLabels[c]);
+ }
+ ps.println();
+ }
+ for (int r = 0; r < m.numRows(); r++) {
+ if (doLabels) {
+ ps.print(rowLabels[0] + ',');
+ }
+ ps.print(Double.toString(m.getQuick(r,0)));
+ for (int c = 1; c < m.numCols(); c++) {
+ ps.print(",");
+ ps.print(Double.toString(m.getQuick(r,c)));
+ }
+ ps.println();
+ }
+ if (ps != System.out) {
+ ps.close();
+ }
+ }
+
+ private static PrintStream getPrintStream(String outputPath) throws IOException {
+ if (outputPath == null) {
+ return System.out;
+ }
+ File outputFile = new File(outputPath);
+ if (outputFile.exists()) {
+ outputFile.delete();
+ }
+ outputFile.createNewFile();
+ OutputStream os = new FileOutputStream(outputFile);
+ return new PrintStream(os, false, Charsets.UTF_8.displayName());
+ }
+
+ /**
+ * return the label set, sorted by matrix order
+ * if there are no labels, fabricate them using the starter string
+ * @param length
+ */
+ private static String[] getLabels(int length, Map<String,Integer> labels, String start) {
+ if (labels != null) {
+ return sortLabels(labels);
+ }
+ String[] sorted = new String[length];
+ for (int i = 1; i <= length; i++) {
+ sorted[i] = start + i;
+ }
+ return sorted;
+ }
+
+ private static String[] sortLabels(Map<String,Integer> labels) {
+ String[] sorted = new String[labels.size()];
+ for (Map.Entry<String,Integer> entry : labels.entrySet()) {
+ sorted[entry.getValue()] = entry.getKey();
+ }
+ return sorted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
----------------------------------------------------------------------
diff --git a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
new file mode 100644
index 0000000..e01868a
--- /dev/null
+++ b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/SequenceFileDumper.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils;
+
+import java.io.File;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterator;
+import org.apache.mahout.math.list.IntArrayList;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+public final class SequenceFileDumper extends AbstractJob {
+
+ public SequenceFileDumper() {
+ setConf(new Configuration());
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ addInputOption();
+ addOutputOption();
+ addOption("substring", "b", "The number of chars to print out per value", false);
+ addOption(buildOption("count", "c", "Report the count only", false, false, null));
+ addOption("numItems", "n", "Output at most <n> key value pairs", false);
+ addOption(buildOption("facets", "fa", "Output the counts per key. Note, if there are a lot of unique keys, "
+ + "this can take up a fair amount of memory", false, false, null));
+ addOption(buildOption("quiet", "q", "Print only file contents.", false, false, null));
+
+ if (parseArguments(args, false, true) == null) {
+ return -1;
+ }
+
+ Path[] pathArr;
+ Configuration conf = new Configuration();
+ Path input = getInputPath();
+ FileSystem fs = input.getFileSystem(conf);
+ if (fs.getFileStatus(input).isDir()) {
+ pathArr = FileUtil.stat2Paths(fs.listStatus(input, PathFilters.logsCRCFilter()));
+ } else {
+ pathArr = new Path[1];
+ pathArr[0] = input;
+ }
+
+
+ Writer writer;
+ boolean shouldClose;
+ if (hasOption("output")) {
+ shouldClose = true;
+ writer = Files.newWriter(new File(getOption("output")), Charsets.UTF_8);
+ } else {
+ shouldClose = false;
+ writer = new OutputStreamWriter(System.out, Charsets.UTF_8);
+ }
+ try {
+ for (Path path : pathArr) {
+ if (!hasOption("quiet")) {
+ writer.append("Input Path: ").append(String.valueOf(path)).append('\n');
+ }
+
+ int sub = Integer.MAX_VALUE;
+ if (hasOption("substring")) {
+ sub = Integer.parseInt(getOption("substring"));
+ }
+ boolean countOnly = hasOption("count");
+ SequenceFileIterator<?, ?> iterator = new SequenceFileIterator<>(path, true, conf);
+ if (!hasOption("quiet")) {
+ writer.append("Key class: ").append(iterator.getKeyClass().toString());
+ writer.append(" Value Class: ").append(iterator.getValueClass().toString()).append('\n');
+ }
+ OpenObjectIntHashMap<String> facets = null;
+ if (hasOption("facets")) {
+ facets = new OpenObjectIntHashMap<>();
+ }
+ long count = 0;
+ if (countOnly) {
+ while (iterator.hasNext()) {
+ Pair<?, ?> record = iterator.next();
+ String key = record.getFirst().toString();
+ if (facets != null) {
+ facets.adjustOrPutValue(key, 1, 1); //either insert or add 1
+ }
+ count++;
+ }
+ writer.append("Count: ").append(String.valueOf(count)).append('\n');
+ } else {
+ long numItems = Long.MAX_VALUE;
+ if (hasOption("numItems")) {
+ numItems = Long.parseLong(getOption("numItems"));
+ if (!hasOption("quiet")) {
+ writer.append("Max Items to dump: ").append(String.valueOf(numItems)).append("\n");
+ }
+ }
+ while (iterator.hasNext() && count < numItems) {
+ Pair<?, ?> record = iterator.next();
+ String key = record.getFirst().toString();
+ writer.append("Key: ").append(key);
+ String str = record.getSecond().toString();
+ writer.append(": Value: ").append(str.length() > sub
+ ? str.substring(0, sub) : str);
+ writer.write('\n');
+ if (facets != null) {
+ facets.adjustOrPutValue(key, 1, 1); //either insert or add 1
+ }
+ count++;
+ }
+ if (!hasOption("quiet")) {
+ writer.append("Count: ").append(String.valueOf(count)).append('\n');
+ }
+ }
+ if (facets != null) {
+ List<String> keyList = new ArrayList<>(facets.size());
+
+ IntArrayList valueList = new IntArrayList(facets.size());
+ facets.pairsSortedByKey(keyList, valueList);
+ writer.append("-----Facets---\n");
+ writer.append("Key\t\tCount\n");
+ int i = 0;
+ for (String key : keyList) {
+ writer.append(key).append("\t\t").append(String.valueOf(valueList.get(i++))).append('\n');
+ }
+ }
+ }
+ writer.flush();
+
+ } finally {
+ if (shouldClose) {
+ Closeables.close(writer, false);
+ }
+ }
+
+
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ new SequenceFileDumper().run(args);
+ }
+
+}