You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ro...@apache.org on 2012/05/13 23:55:27 UTC
svn commit: r1337989 - in
/mahout/trunk/integration/src/main/java/org/apache/mahout/text: ./ wikipedia/
Author: robinanil
Date: Sun May 13 21:55:26 2012
New Revision: 1337989
URL: http://svn.apache.org/viewvc?rev=1337989&view=rev
Log:
MAHOUT-1010 bringing back deleted wikipedia dataset creator and moving it to integration
Added:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/WikipediaToSequenceFile.java Sun May 13 21:55:26 2012
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.text.wikipedia.WikipediaMapper;
+import org.apache.mahout.text.wikipedia.XmlInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaToSequenceFile {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaToSequenceFile.class);
+
+ private WikipediaToSequenceFile() { }
+
+ /**
+ * Takes in two arguments:
+ * <ol>
+ * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+ * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+ * {@link org.apache.hadoop.io.SequenceFile}</li>
+ * </ol>
+ */
+ public static void main(String[] args) throws IOException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+
+ Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+
+ Option categoriesOpt = obuilder.withLongName("categories").withArgument(
+ abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Location of the categories file. One entry per line. "
+ + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+
+ Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+ "If set, then the category name must exactly match the "
+ + "entry in the categories file. Default is false").withShortName("e").create();
+
+ Option allOpt = obuilder.withLongName("all")
+ .withDescription("If set, Select all files. Default is false").withShortName("all").create();
+
+ Option helpOpt = DefaultOptionCreator.helpOption();
+
+ Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+ .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(allOpt).withOption(helpOpt)
+ .create();
+
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ parser.setHelpOption(helpOpt);
+ try {
+ CommandLine cmdLine = parser.parse(args);
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+ String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+
+ String catFile = "";
+ if (cmdLine.hasOption(categoriesOpt)) {
+ catFile = (String) cmdLine.getValue(categoriesOpt);
+ }
+
+ boolean all = false;
+ if (cmdLine.hasOption(allOpt)) {
+ all = true;
+ }
+ runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt), all);
+ } catch (OptionException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ } catch (InterruptedException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ } catch (ClassNotFoundException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ }
+ }
+
+ /**
+ * Run the job
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
+ * @param catFile
+ * the file containing the Wikipedia categories
+ * @param exactMatchOnly
+ * if true, then the Wikipedia category must match exactly instead of simply containing the
+ * category string
+ * @param all
+ * if true select all categories
+ */
+ public static void runJob(String input,
+ String output,
+ String catFile,
+ boolean exactMatchOnly,
+ boolean all) throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
+ conf.set("xmlinput.start", "<page>");
+ conf.set("xmlinput.end", "</page>");
+ conf.setBoolean("exact.match.only", exactMatchOnly);
+ conf.setBoolean("all.files", all);
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+
+ Job job = new Job(conf);
+ if (log.isInfoEnabled()) {
+ log.info("Input: {} Out: {} Categories: {} All Files: {}", new Object[] {input, output, catFile, all});
+ }
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ FileInputFormat.setInputPaths(job, new Path(input));
+ Path outPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outPath);
+ job.setMapperClass(WikipediaMapper.class);
+ job.setInputFormatClass(XmlInputFormat.class);
+ job.setReducerClass(Reducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setJarByClass(WikipediaToSequenceFile.class);
+
+ /*
+ * conf.set("mapred.compress.map.output", "true"); conf.set("mapred.map.output.compression.type",
+ * "BLOCK"); conf.set("mapred.output.compress", "true"); conf.set("mapred.output.compression.type",
+ * "BLOCK"); conf.set("mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec");
+ */
+ HadoopUtil.delete(conf, outPath);
+
+ Set<String> categories = new HashSet<String>();
+ if (!catFile.isEmpty()) {
+ for (String line : new FileLineIterable(new File(catFile))) {
+ categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+ }
+ }
+
+ DefaultStringifier<Set<String>> setStringifier =
+ new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(categories));
+
+ String categoriesStr = setStringifier.toString(categories);
+
+ conf.set("wikipedia.categories", categoriesStr);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorDriver.java Sun May 13 21:55:26 2012
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.analysis.WikipediaAnalyzer;
+import org.apache.mahout.common.ClassUtils;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Create and run the Wikipedia Dataset Creator.
+ */
+public final class WikipediaDatasetCreatorDriver {
+ private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorDriver.class);
+
+ private WikipediaDatasetCreatorDriver() { }
+
+ /**
+ * Takes in two arguments:
+ * <ol>
+ * <li>The input {@link org.apache.hadoop.fs.Path} where the input documents live</li>
+ * <li>The output {@link org.apache.hadoop.fs.Path} where to write the classifier as a
+ * {@link org.apache.hadoop.io.SequenceFile}</li>
+ * </ol>
+ */
+ public static void main(String[] args) throws IOException, InterruptedException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option dirInputPathOpt = DefaultOptionCreator.inputOption().create();
+
+ Option dirOutputPathOpt = DefaultOptionCreator.outputOption().create();
+
+ Option categoriesOpt = obuilder.withLongName("categories").withRequired(true).withArgument(
+ abuilder.withName("categories").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Location of the categories file. One entry per line. "
+ + "Will be used to make a string match in Wikipedia Category field").withShortName("c").create();
+
+ Option exactMatchOpt = obuilder.withLongName("exactMatch").withDescription(
+ "If set, then the category name must exactly match the "
+ + "entry in the categories file. Default is false").withShortName("e").create();
+ Option analyzerOpt = obuilder.withLongName("analyzer").withRequired(false).withArgument(
+ abuilder.withName("analyzer").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The analyzer to use, must have a no argument constructor").withShortName("a").create();
+ Option helpOpt = DefaultOptionCreator.helpOption();
+
+ Group group = gbuilder.withName("Options").withOption(categoriesOpt).withOption(dirInputPathOpt)
+ .withOption(dirOutputPathOpt).withOption(exactMatchOpt).withOption(analyzerOpt).withOption(helpOpt)
+ .create();
+
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ try {
+ CommandLine cmdLine = parser.parse(args);
+ if (cmdLine.hasOption(helpOpt)) {
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ String inputPath = (String) cmdLine.getValue(dirInputPathOpt);
+ String outputPath = (String) cmdLine.getValue(dirOutputPathOpt);
+ String catFile = (String) cmdLine.getValue(categoriesOpt);
+ Class<? extends Analyzer> analyzerClass = WikipediaAnalyzer.class;
+ if (cmdLine.hasOption(analyzerOpt)) {
+ String className = cmdLine.getValue(analyzerOpt).toString();
+ analyzerClass = Class.forName(className).asSubclass(Analyzer.class);
+ // try instantiating it, b/c there isn't any point in setting it if
+ // you can't instantiate it
+ ClassUtils.instantiateAs(analyzerClass, Analyzer.class);
+ }
+ runJob(inputPath, outputPath, catFile, cmdLine.hasOption(exactMatchOpt),
+ analyzerClass);
+ } catch (OptionException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ } catch (ClassNotFoundException e) {
+ log.error("Exception", e);
+ CommandLineUtil.printHelp(group);
+ }
+ }
+
+ /**
+ * Run the job
+ *
+ * @param input
+ * the input pathname String
+ * @param output
+ * the output pathname String
+ * @param catFile
+ * the file containing the Wikipedia categories
+ * @param exactMatchOnly
+ * if true, then the Wikipedia category must match exactly instead of simply containing the
+ * category string
+ */
+ public static void runJob(String input,
+ String output,
+ String catFile,
+ boolean exactMatchOnly,
+ Class<? extends Analyzer> analyzerClass)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ Configuration conf = new Configuration();
+ conf.set("key.value.separator.in.input.line", " ");
+ conf.set("xmlinput.start", "<page>");
+ conf.set("xmlinput.end", "</page>");
+ conf.setBoolean("exact.match.only", exactMatchOnly);
+ conf.set("analyzer.class", analyzerClass.getName());
+ conf.set("io.serializations",
+ "org.apache.hadoop.io.serializer.JavaSerialization,"
+ + "org.apache.hadoop.io.serializer.WritableSerialization");
+ // Dont ever forget this. People should keep track of how hadoop conf
+ // parameters can make or break a piece of code
+
+ Set<String> categories = new HashSet<String>();
+ for (String line : new FileLineIterable(new File(catFile))) {
+ categories.add(line.trim().toLowerCase(Locale.ENGLISH));
+ }
+
+ DefaultStringifier<Set<String>> setStringifier =
+ new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(categories));
+
+ String categoriesStr = setStringifier.toString(categories);
+
+ conf.set("wikipedia.categories", categoriesStr);
+
+ Job job = new Job(conf);
+ if (log.isInfoEnabled()) {
+ log.info("Input: {} Out: {} Categories: {}", new Object[] {input, output, catFile});
+ }
+ job.setJarByClass(WikipediaDatasetCreatorDriver.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setMapperClass(WikipediaDatasetCreatorMapper.class);
+ //TODO: job.setNumMapTasks(100);
+ job.setInputFormatClass(XmlInputFormat.class);
+ job.setReducerClass(WikipediaDatasetCreatorReducer.class);
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ FileInputFormat.setInputPaths(job, new Path(input));
+ Path outPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outPath);
+ HadoopUtil.delete(conf, outPath);
+
+ boolean succeeded = job.waitForCompletion(true);
+ if (!succeeded) {
+ throw new IllegalStateException("Job failed!");
+ }
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorMapper.java Sun May 13 21:55:26 2012
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.analysis.WikipediaAnalyzer;
+import org.apache.mahout.common.ClassUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ *
+ */
+public class WikipediaDatasetCreatorMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaDatasetCreatorMapper.class);
+
+ private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s\\W]");
+ private static final Pattern OPEN_TEXT_TAG_PATTERN = Pattern.compile("<text xml:space=\"preserve\">");
+ private static final Pattern CLOSE_TEXT_TAG_PATTERN = Pattern.compile("</text>");
+
+ private List<String> inputCategories;
+ private List<Pattern> inputCategoryPatterns;
+ private boolean exactMatchOnly;
+ private Analyzer analyzer;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ String document = value.toString();
+ document = StringEscapeUtils.unescapeHtml(CLOSE_TEXT_TAG_PATTERN.matcher(
+ OPEN_TEXT_TAG_PATTERN.matcher(document).replaceFirst("")).replaceAll(""));
+ String catMatch = findMatchingCategory(document);
+ if (!"Unknown".equals(catMatch)) {
+ StringBuilder contents = new StringBuilder(1000);
+ TokenStream stream = analyzer.reusableTokenStream(catMatch, new StringReader(document));
+ CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);
+ stream.reset();
+ while (stream.incrementToken()) {
+ contents.append(termAtt.buffer(), 0, termAtt.length()).append(' ');
+ }
+ context.write(
+ new Text(SPACE_NON_ALPHA_PATTERN.matcher(catMatch).replaceAll("_")),
+ new Text(contents.toString()));
+ }
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+
+ if (inputCategories == null) {
+ Set<String> newCategories = new HashSet<String>();
+ DefaultStringifier<Set<String>> setStringifier =
+ new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(newCategories));
+ String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories));
+ Set<String> inputCategoriesSet = setStringifier.fromString(categoriesStr);
+ inputCategories = Lists.newArrayList(inputCategoriesSet);
+ inputCategoryPatterns = Lists.newArrayListWithCapacity(inputCategories.size());
+ for (String inputCategory : inputCategories) {
+ inputCategoryPatterns.add(Pattern.compile(".*\\b" + inputCategory + "\\b.*"));
+ }
+
+ }
+
+ exactMatchOnly = conf.getBoolean("exact.match.only", false);
+
+ if (analyzer == null) {
+ String analyzerStr = conf.get("analyzer.class", WikipediaAnalyzer.class.getName());
+ analyzer = ClassUtils.instantiateAs(analyzerStr, Analyzer.class);
+ }
+
+ log.info("Configure: Input Categories size: {} Exact Match: {} Analyzer: {}",
+ new Object[] {inputCategories.size(), exactMatchOnly, analyzer.getClass().getName()});
+ }
+
+ private String findMatchingCategory(String document) {
+ int startIndex = 0;
+ int categoryIndex;
+ while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+ categoryIndex += 11;
+ int endIndex = document.indexOf("]]", categoryIndex);
+ if (endIndex >= document.length() || endIndex < 0) {
+ break;
+ }
+ String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+ // categories.add(category.toLowerCase());
+ if (exactMatchOnly && inputCategories.contains(category)) {
+ return category;
+ }
+ if (!exactMatchOnly) {
+ for (int i = 0; i < inputCategories.size(); i++) {
+ String inputCategory = inputCategories.get(i);
+ Pattern inputCategoryPattern = inputCategoryPatterns.get(i);
+ if (inputCategoryPattern.matcher(category).matches()) { // inexact match with word boundary.
+ return inputCategory;
+ }
+ }
+ }
+ startIndex = endIndex;
+ }
+ return "Unknown";
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaDatasetCreatorReducer.java Sun May 13 21:55:26 2012
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Reducer;
+
+/**
+ * Can also be used as a local Combiner
+ */
+public class WikipediaDatasetCreatorReducer extends Reducer<Text, Text, Text, Text> {
+
+ @Override
+ protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ // Key is label,word, value is the number of times we've seen this label
+ // word per local node. Output is the same
+ for (Text value : values) {
+ context.write(key, value);
+ }
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaMapper.java Sun May 13 21:55:26 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DefaultStringifier;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.GenericsUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Maps over Wikipedia xml format and output all document having the category listed in the input category
+ * file
+ *
+ */
+public class WikipediaMapper extends Mapper<LongWritable, Text, Text, Text> {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaMapper.class);
+
+ private static final Pattern SPACE_NON_ALPHA_PATTERN = Pattern.compile("[\\s]");
+
+ private static final String START_DOC = "<text xml:space=\"preserve\">";
+
+ private static final String END_DOC = "</text>";
+
+ private static final Pattern TITLE = Pattern.compile("<title>(.*)<\\/title>");
+
+ private static final String REDIRECT = "<redirect />";
+
+ private Set<String> inputCategories;
+
+ private boolean exactMatchOnly;
+
+ private boolean all;
+
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+
+ String content = value.toString();
+ if (content.contains(REDIRECT)) {
+ return;
+ }
+ String document;
+ String title;
+ try {
+ document = getDocument(content);
+ title = getTitle(content);
+ } catch (RuntimeException e) {
+ // TODO: reporter.getCounter("Wikipedia", "Parse errors").increment(1);
+ return;
+ }
+
+ if (!all) {
+ String catMatch = findMatchingCategory(document);
+ if ("Unknown".equals(catMatch)) {
+ return;
+ }
+ }
+ document = StringEscapeUtils.unescapeHtml(document);
+ context.write(new Text(SPACE_NON_ALPHA_PATTERN.matcher(title).replaceAll("_")), new Text(document));
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ Configuration conf = context.getConfiguration();
+ if (inputCategories == null) {
+ Set<String> newCategories = new HashSet<String>();
+
+ DefaultStringifier<Set<String>> setStringifier =
+ new DefaultStringifier<Set<String>>(conf, GenericsUtil.getClass(newCategories));
+
+ String categoriesStr = conf.get("wikipedia.categories", setStringifier.toString(newCategories));
+ inputCategories = setStringifier.fromString(categoriesStr);
+ }
+ exactMatchOnly = conf.getBoolean("exact.match.only", false);
+ all = conf.getBoolean("all.files", true);
+ log.info("Configure: Input Categories size: {} All: {} Exact Match: {}",
+ new Object[] {inputCategories.size(), all, exactMatchOnly});
+ }
+
+ private static String getDocument(String xml) {
+ int start = xml.indexOf(START_DOC) + START_DOC.length();
+ int end = xml.indexOf(END_DOC, start);
+ return xml.substring(start, end);
+ }
+
+ private static String getTitle(CharSequence xml) {
+ Matcher m = TITLE.matcher(xml);
+ return m.find() ? m.group(1) : "";
+ }
+
+ private String findMatchingCategory(String document) {
+ int startIndex = 0;
+ int categoryIndex;
+ while ((categoryIndex = document.indexOf("[[Category:", startIndex)) != -1) {
+ categoryIndex += 11;
+ int endIndex = document.indexOf("]]", categoryIndex);
+ if (endIndex >= document.length() || endIndex < 0) {
+ break;
+ }
+ String category = document.substring(categoryIndex, endIndex).toLowerCase(Locale.ENGLISH).trim();
+ if (exactMatchOnly && inputCategories.contains(category)) {
+ return category;
+ }
+ if (!exactMatchOnly) {
+ for (String inputCategory : inputCategories) {
+ if (category.contains(inputCategory)) { // we have an inexact match
+ return inputCategory;
+ }
+ }
+ }
+ startIndex = endIndex;
+ }
+ return "Unknown";
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/WikipediaXmlSplitter.java Sun May 13 21:55:26 2012
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.net.URI;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+
+import com.google.common.io.Closeables;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.builder.ArgumentBuilder;
+import org.apache.commons.cli2.builder.DefaultOptionBuilder;
+import org.apache.commons.cli2.builder.GroupBuilder;
+import org.apache.commons.cli2.commandline.Parser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.mahout.common.CommandLineUtil;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>The Bayes example package provides some helper classes for training the Naive Bayes classifier
+ * on the Twenty Newsgroups data. See {@link org.apache.mahout.examples.wikipedia.PrepareTwentyNewsgroups}
+ * for details on running the trainer and
+ * formatting the Twenty Newsgroups data properly for the training.</p>
+ *
+ * <p>The easiest way to prepare the data is to use the ant task in core/build.xml:</p>
+ *
+ * <p>{@code ant extract-20news-18828}</p>
+ *
+ * <p>This runs the arg line:</p>
+ *
+ * <p>{@code -p $\{working.dir\}/20news-18828/ -o $\{working.dir\}/20news-18828-collapse -a $\{analyzer\} -c UTF-8}</p>
+ *
+ * <p>To Run the Wikipedia examples (assumes you've built the Mahout Job jar):</p>
+ *
+ * <ol>
+ * <li>Download the Wikipedia Dataset. Use the Ant target: {@code ant enwiki-files}</li>
+ * <li>Chunk the data using the WikipediaXmlSplitter (from the Hadoop home):
+ * {@code bin/hadoop jar $MAHOUT_HOME/target/mahout-examples-0.x
+ * org.apache.mahout.classifier.bayes.WikipediaXmlSplitter
+ * -d $MAHOUT_HOME/examples/temp/enwiki-latest-pages-articles.xml
+ * -o $MAHOUT_HOME/examples/work/wikipedia/chunks/ -c 64}</li>
+ * </ol>
+ */
+public final class WikipediaXmlSplitter {
+
+ private static final Logger log = LoggerFactory.getLogger(WikipediaXmlSplitter.class);
+
+ private WikipediaXmlSplitter() { }
+
+ public static void main(String[] args) throws IOException {
+ DefaultOptionBuilder obuilder = new DefaultOptionBuilder();
+ ArgumentBuilder abuilder = new ArgumentBuilder();
+ GroupBuilder gbuilder = new GroupBuilder();
+
+ Option dumpFileOpt = obuilder.withLongName("dumpFile").withRequired(true).withArgument(
+ abuilder.withName("dumpFile").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The path to the wikipedia dump file (.bz2 or uncompressed)").withShortName("d").create();
+
+ Option outputDirOpt = obuilder.withLongName("outputDir").withRequired(true).withArgument(
+ abuilder.withName("outputDir").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The output directory to place the splits in:\n"
+ + "local files:\n\t/var/data/wikipedia-xml-chunks or\n\tfile:///var/data/wikipedia-xml-chunks\n"
+ + "Hadoop DFS:\n\thdfs://wikipedia-xml-chunks\n"
+ + "AWS S3 (blocks):\n\ts3://bucket-name/wikipedia-xml-chunks\n"
+ + "AWS S3 (native files):\n\ts3n://bucket-name/wikipedia-xml-chunks\n")
+
+ .withShortName("o").create();
+
+ Option s3IdOpt = obuilder.withLongName("s3ID").withRequired(false).withArgument(
+ abuilder.withName("s3Id").withMinimum(1).withMaximum(1).create()).withDescription("Amazon S3 ID key")
+ .withShortName("i").create();
+ Option s3SecretOpt = obuilder.withLongName("s3Secret").withRequired(false).withArgument(
+ abuilder.withName("s3Secret").withMinimum(1).withMaximum(1).create()).withDescription(
+ "Amazon S3 secret key").withShortName("s").create();
+
+ Option chunkSizeOpt = obuilder.withLongName("chunkSize").withRequired(true).withArgument(
+ abuilder.withName("chunkSize").withMinimum(1).withMaximum(1).create()).withDescription(
+ "The Size of the chunk, in megabytes").withShortName("c").create();
+ Option numChunksOpt = obuilder
+ .withLongName("numChunks")
+ .withRequired(false)
+ .withArgument(abuilder.withName("numChunks").withMinimum(1).withMaximum(1).create())
+ .withDescription(
+ "The maximum number of chunks to create. If specified, program will only create a subset of the chunks")
+ .withShortName("n").create();
+ Group group = gbuilder.withName("Options").withOption(dumpFileOpt).withOption(outputDirOpt).withOption(
+ chunkSizeOpt).withOption(numChunksOpt).withOption(s3IdOpt).withOption(s3SecretOpt).create();
+
+ Parser parser = new Parser();
+ parser.setGroup(group);
+ CommandLine cmdLine;
+ try {
+ cmdLine = parser.parse(args);
+ } catch (OptionException e) {
+ log.error("Error while parsing options", e);
+ CommandLineUtil.printHelp(group);
+ return;
+ }
+
+ Configuration conf = new Configuration();
+ String dumpFilePath = (String) cmdLine.getValue(dumpFileOpt);
+ String outputDirPath = (String) cmdLine.getValue(outputDirOpt);
+
+ if (cmdLine.hasOption(s3IdOpt)) {
+ String id = (String) cmdLine.getValue(s3IdOpt);
+ conf.set("fs.s3n.awsAccessKeyId", id);
+ conf.set("fs.s3.awsAccessKeyId", id);
+ }
+ if (cmdLine.hasOption(s3SecretOpt)) {
+ String secret = (String) cmdLine.getValue(s3SecretOpt);
+ conf.set("fs.s3n.awsSecretAccessKey", secret);
+ conf.set("fs.s3.awsSecretAccessKey", secret);
+ }
+ // do not compute crc file when using local FS
+ conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem");
+ FileSystem fs = FileSystem.get(URI.create(outputDirPath), conf);
+
+ int chunkSize = 1024 * 1024 * Integer.parseInt((String) cmdLine.getValue(chunkSizeOpt));
+
+ int numChunks = Integer.MAX_VALUE;
+ if (cmdLine.hasOption(numChunksOpt)) {
+ numChunks = Integer.parseInt((String) cmdLine.getValue(numChunksOpt));
+ }
+
+ String header = "<mediawiki xmlns=\"http://www.mediawiki.org/xml/export-0.3/\" "
+ + "xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" "
+ + "xsi:schemaLocation=\"http://www.mediawiki.org/xml/export-0.3/ "
+ + "http://www.mediawiki.org/xml/export-0.3.xsd\" " + "version=\"0.3\" "
+ + "xml:lang=\"en\">\n" + " <siteinfo>\n" + "<sitename>Wikipedia</sitename>\n"
+ + " <base>http://en.wikipedia.org/wiki/Main_Page</base>\n"
+ + " <generator>MediaWiki 1.13alpha</generator>\n" + " <case>first-letter</case>\n"
+ + " <namespaces>\n" + " <namespace key=\"-2\">Media</namespace>\n"
+ + " <namespace key=\"-1\">Special</namespace>\n" + " <namespace key=\"0\" />\n"
+ + " <namespace key=\"1\">Talk</namespace>\n"
+ + " <namespace key=\"2\">User</namespace>\n"
+ + " <namespace key=\"3\">User talk</namespace>\n"
+ + " <namespace key=\"4\">Wikipedia</namespace>\n"
+ + " <namespace key=\"5\">Wikipedia talk</namespace>\n"
+ + " <namespace key=\"6\">Image</namespace>\n"
+ + " <namespace key=\"7\">Image talk</namespace>\n"
+ + " <namespace key=\"8\">MediaWiki</namespace>\n"
+ + " <namespace key=\"9\">MediaWiki talk</namespace>\n"
+ + " <namespace key=\"10\">Template</namespace>\n"
+ + " <namespace key=\"11\">Template talk</namespace>\n"
+ + " <namespace key=\"12\">Help</namespace>\n"
+ + " <namespace key=\"13\">Help talk</namespace>\n"
+ + " <namespace key=\"14\">Category</namespace>\n"
+ + " <namespace key=\"15\">Category talk</namespace>\n"
+ + " <namespace key=\"100\">Portal</namespace>\n"
+ + " <namespace key=\"101\">Portal talk</namespace>\n" + " </namespaces>\n"
+ + " </siteinfo>\n";
+
+ StringBuilder content = new StringBuilder();
+ content.append(header);
+ NumberFormat decimalFormatter = new DecimalFormat("0000");
+ File dumpFile = new File(dumpFilePath);
+ FileLineIterator it;
+ if (dumpFilePath.endsWith(".bz2")) {
+ // default compression format from http://download.wikimedia.org
+ CompressionCodec codec = new BZip2Codec();
+ it = new FileLineIterator(codec.createInputStream(new FileInputStream(dumpFile)));
+ } else {
+ // assume the user has previously de-compressed the dump file
+ it = new FileLineIterator(dumpFile);
+ }
+ int filenumber = 0;
+ while (it.hasNext()) {
+ String thisLine = it.next();
+ if (thisLine.trim().startsWith("<page>")) {
+ boolean end = false;
+ while (!thisLine.trim().startsWith("</page>")) {
+ content.append(thisLine).append('\n');
+ if (it.hasNext()) {
+ thisLine = it.next();
+ } else {
+ end = true;
+ break;
+ }
+ }
+ content.append(thisLine).append('\n');
+
+ if (content.length() > chunkSize || end) {
+ content.append("</mediawiki>");
+ filenumber++;
+ String filename = outputDirPath + "/chunk-" + decimalFormatter.format(filenumber) + ".xml";
+ BufferedWriter chunkWriter =
+ new BufferedWriter(new OutputStreamWriter(fs.create(new Path(filename)), "UTF-8"));
+ try {
+ chunkWriter.write(content.toString(), 0, content.length());
+ } finally {
+ Closeables.closeQuietly(chunkWriter);
+ }
+ if (filenumber >= numChunks) {
+ break;
+ }
+ content = new StringBuilder();
+ content.append(header);
+ }
+ }
+ }
+ }
+}
Added: mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java
URL: http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java?rev=1337989&view=auto
==============================================================================
--- mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java (added)
+++ mahout/trunk/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java Sun May 13 21:55:26 2012
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.text.wikipedia;
+
+import java.io.IOException;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads records that are delimited by a specific begin/end tag.
+ */
+public class XmlInputFormat extends TextInputFormat {
+
+ private static final Logger log = LoggerFactory.getLogger(XmlInputFormat.class);
+
+ public static final String START_TAG_KEY = "xmlinput.start";
+ public static final String END_TAG_KEY = "xmlinput.end";
+
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) {
+ try {
+ return new XmlRecordReader((FileSplit) split, context.getConfiguration());
+ } catch (IOException ioe) {
+ log.warn("Error while creating XmlRecordReader", ioe);
+ return null;
+ }
+ }
+
+ /**
+ * XMLRecordReader class to read through a given xml document to output xml blocks as records as specified
+ * by the start tag and end tag
+ *
+ */
+ public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
+
+ private final byte[] startTag;
+ private final byte[] endTag;
+ private final long start;
+ private final long end;
+ private final FSDataInputStream fsin;
+ private final DataOutputBuffer buffer = new DataOutputBuffer();
+ private LongWritable currentKey;
+ private Text currentValue;
+
+ public XmlRecordReader(FileSplit split, Configuration conf) throws IOException {
+ startTag = conf.get(START_TAG_KEY).getBytes(Charsets.UTF_8);
+ endTag = conf.get(END_TAG_KEY).getBytes(Charsets.UTF_8);
+
+ // open the file and seek to the start of the split
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ fsin = fs.open(split.getPath());
+ fsin.seek(start);
+ }
+
+ private boolean next(LongWritable key, Text value) throws IOException {
+ if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
+ try {
+ buffer.write(startTag);
+ if (readUntilMatch(endTag, true)) {
+ key.set(fsin.getPos());
+ value.set(buffer.getData(), 0, buffer.getLength());
+ return true;
+ }
+ } finally {
+ buffer.reset();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ Closeables.closeQuietly(fsin);
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return (fsin.getPos() - start) / (float) (end - start);
+ }
+
+ private boolean readUntilMatch(byte[] match, boolean withinBlock) throws IOException {
+ int i = 0;
+ while (true) {
+ int b = fsin.read();
+ // end of file:
+ if (b == -1) {
+ return false;
+ }
+ // save to buffer:
+ if (withinBlock) {
+ buffer.write(b);
+ }
+
+ // check if we're matching:
+ if (b == match[i]) {
+ i++;
+ if (i >= match.length) {
+ return true;
+ }
+ } else {
+ i = 0;
+ }
+ // see if we've passed the stop point:
+ if (!withinBlock && i == 0 && fsin.getPos() >= end) {
+ return false;
+ }
+ }
+ }
+
+ @Override
+ public LongWritable getCurrentKey() throws IOException, InterruptedException {
+ return currentKey;
+ }
+
+ @Override
+ public Text getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ @Override
+ public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ currentKey = new LongWritable();
+ currentValue = new Text();
+ return next(currentKey, currentValue);
+ }
+ }
+}