You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:51 UTC
[35/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java b/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java
new file mode 100644
index 0000000..54ec543
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/DmozParser.java
@@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.*;
+import java.util.*;
+import java.util.regex.*;
+
+import javax.xml.parsers.*;
+import org.xml.sax.*;
+import org.xml.sax.helpers.*;
+import org.apache.xerces.util.XMLChar;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.util.NutchConfiguration;
+
+/** Utility that converts DMOZ RDF into a flat file of URLs to be injected. */
+public class DmozParser {
+ public static final Logger LOG = LoggerFactory.getLogger(DmozParser.class);
+
+ long pages = 0;
+
+ /**
+ * This filter fixes characters that might offend our parser. This lets us be
+ * tolerant of errors that might appear in the input XML.
+ */
+ private static class XMLCharFilter extends FilterReader {
+ private boolean lastBad = false;
+
+ public XMLCharFilter(Reader reader) {
+ super(reader);
+ }
+
+ public int read() throws IOException {
+ int c = in.read();
+ int value = c;
+ if (c != -1 && !(XMLChar.isValid(c))) // fix invalid characters
+ value = 'X';
+ else if (lastBad && c == '<') { // fix mis-matched brackets
+ in.mark(1);
+ if (in.read() != '/')
+ value = 'X';
+ in.reset();
+ }
+ lastBad = (c == 65533);
+
+ return value;
+ }
+
+ public int read(char[] cbuf, int off, int len) throws IOException {
+ int n = in.read(cbuf, off, len);
+ if (n != -1) {
+ for (int i = 0; i < n; i++) {
+ char c = cbuf[off + i];
+ char value = c;
+ if (!(XMLChar.isValid(c))) // fix invalid characters
+ value = 'X';
+ else if (lastBad && c == '<') { // fix mis-matched brackets
+ if (i != n - 1 && cbuf[off + i + 1] != '/')
+ value = 'X';
+ }
+ lastBad = (c == 65533);
+ cbuf[off + i] = value;
+ }
+ }
+ return n;
+ }
+ }
+
+ /**
+ * The RDFProcessor receives tag messages during a parse of RDF XML data. We
+ * build whatever structures we need from these messages.
+ */
+ private class RDFProcessor extends DefaultHandler {
+ String curURL = null, curSection = null;
+ boolean titlePending = false, descPending = false,
+ insideAdultSection = false;
+ Pattern topicPattern = null;
+ StringBuffer title = new StringBuffer(), desc = new StringBuffer();
+ XMLReader reader;
+ int subsetDenom;
+ int hashSkew;
+ boolean includeAdult;
+ Locator location;
+
+ /**
+ * Pass in an XMLReader, plus a flag as to whether we should include adult
+ * material.
+ */
+ public RDFProcessor(XMLReader reader, int subsetDenom,
+ boolean includeAdult, int skew, Pattern topicPattern)
+ throws IOException {
+ this.reader = reader;
+ this.subsetDenom = subsetDenom;
+ this.includeAdult = includeAdult;
+ this.topicPattern = topicPattern;
+
+ this.hashSkew = skew != 0 ? skew : new Random().nextInt();
+ }
+
+ //
+ // Interface ContentHandler
+ //
+
+ /**
+ * Start of an XML elt
+ */
+ public void startElement(String namespaceURI, String localName,
+ String qName, Attributes atts) throws SAXException {
+ if ("Topic".equals(qName)) {
+ curSection = atts.getValue("r:id");
+ } else if ("ExternalPage".equals(qName)) {
+ // Porn filter
+ if ((!includeAdult) && curSection.startsWith("Top/Adult")) {
+ return;
+ }
+
+ if (topicPattern != null && !topicPattern.matcher(curSection).matches()) {
+ return;
+ }
+
+ // Subset denominator filter.
+ // Only emit with a chance of 1/denominator.
+ String url = atts.getValue("about");
+ int hashValue = MD5Hash.digest(url).hashCode();
+ hashValue = Math.abs(hashValue ^ hashSkew);
+ if ((hashValue % subsetDenom) != 0) {
+ return;
+ }
+
+ // We actually claim the URL!
+ curURL = url;
+ } else if (curURL != null && "d:Title".equals(qName)) {
+ titlePending = true;
+ } else if (curURL != null && "d:Description".equals(qName)) {
+ descPending = true;
+ }
+ }
+
+ /**
+ * The contents of an XML elt
+ */
+ public void characters(char ch[], int start, int length) {
+ if (titlePending) {
+ title.append(ch, start, length);
+ } else if (descPending) {
+ desc.append(ch, start, length);
+ }
+ }
+
+ /**
+ * Termination of XML elt
+ */
+ public void endElement(String namespaceURI, String localName, String qName)
+ throws SAXException {
+ if (curURL != null) {
+ if ("ExternalPage".equals(qName)) {
+ //
+ // Inc the number of pages, insert the page, and
+ // possibly print status.
+ //
+ System.out.println(curURL);
+ pages++;
+
+ //
+ // Clear out the link text. This is what
+ // you would use for adding to the linkdb.
+ //
+ if (title.length() > 0) {
+ title.delete(0, title.length());
+ }
+ if (desc.length() > 0) {
+ desc.delete(0, desc.length());
+ }
+
+ // Null out the URL.
+ curURL = null;
+ } else if ("d:Title".equals(qName)) {
+ titlePending = false;
+ } else if ("d:Description".equals(qName)) {
+ descPending = false;
+ }
+ }
+ }
+
+ /**
+ * When parsing begins
+ */
+ public void startDocument() {
+ LOG.info("Begin parse");
+ }
+
+ /**
+ * When parsing ends
+ */
+ public void endDocument() {
+ LOG.info("Completed parse. Found " + pages + " pages.");
+ }
+
+ /**
+ * From time to time the Parser will set the "current location" by calling
+ * this function. It's useful for emitting locations for error messages.
+ */
+ public void setDocumentLocator(Locator locator) {
+ location = locator;
+ }
+
+ //
+ // Interface ErrorHandler
+ //
+
+ /**
+ * Emit the exception message
+ */
+ public void error(SAXParseException spe) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Error: " + spe.toString() + ": " + spe.getMessage());
+ }
+ }
+
+ /**
+ * Emit the exception message, with line numbers
+ */
+ public void errorError(SAXParseException spe) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Fatal err: " + spe.toString() + ": " + spe.getMessage());
+ LOG.error("Last known line is " + location.getLineNumber()
+ + ", column " + location.getColumnNumber());
+ }
+ }
+
+ /**
+ * Emit exception warning message
+ */
+ public void warning(SAXParseException spe) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Warning: " + spe.toString() + ": " + spe.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Iterate through all the items in this structured DMOZ file. Add each URL to
+ * the web db.
+ */
+ public void parseDmozFile(File dmozFile, int subsetDenom,
+ boolean includeAdult, int skew, Pattern topicPattern)
+
+ throws IOException, SAXException, ParserConfigurationException {
+
+ SAXParserFactory parserFactory = SAXParserFactory.newInstance();
+ SAXParser parser = parserFactory.newSAXParser();
+ XMLReader reader = parser.getXMLReader();
+
+ // Create our own processor to receive SAX events
+ RDFProcessor rp = new RDFProcessor(reader, subsetDenom, includeAdult, skew,
+ topicPattern);
+ reader.setContentHandler(rp);
+ reader.setErrorHandler(rp);
+ LOG.info("skew = " + rp.hashSkew);
+
+ //
+ // Open filtered text stream. The TextFilter makes sure that
+ // only appropriate XML-approved Text characters are received.
+ // Any non-conforming characters are silently skipped.
+ //
+ XMLCharFilter in = new XMLCharFilter(new BufferedReader(
+ new InputStreamReader(new BufferedInputStream(new FileInputStream(
+ dmozFile)), "UTF-8")));
+ try {
+ InputSource is = new InputSource(in);
+ reader.parse(is);
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(e.toString());
+ }
+ System.exit(0);
+ } finally {
+ in.close();
+ }
+ }
+
+ private static void addTopicsFromFile(String topicFile, Vector<String> topics)
+ throws IOException {
+ BufferedReader in = null;
+ try {
+ in = new BufferedReader(new InputStreamReader(new FileInputStream(
+ topicFile), "UTF-8"));
+ String line = null;
+ while ((line = in.readLine()) != null) {
+ topics.addElement(new String(line));
+ }
+ } catch (Exception e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error(e.toString());
+ }
+ System.exit(0);
+ } finally {
+ in.close();
+ }
+ }
+
+ /**
+ * Command-line access. User may add URLs via a flat text file or the
+ * structured DMOZ file. By default, we ignore Adult material (as categorized
+ * by DMOZ).
+ */
+ public static void main(String argv[]) throws Exception {
+ if (argv.length < 1) {
+ System.err
+ .println("Usage: DmozParser <dmoz_file> [-subset <subsetDenominator>] [-includeAdultMaterial] [-skew skew] [-topicFile <topic list file>] [-topic <topic> [-topic <topic> [...]]]");
+ return;
+ }
+
+ //
+ // Parse the command line, figure out what kind of
+ // URL file we need to load
+ //
+ int subsetDenom = 1;
+ int skew = 0;
+ String dmozFile = argv[0];
+ boolean includeAdult = false;
+ Pattern topicPattern = null;
+ Vector<String> topics = new Vector<String>();
+
+ Configuration conf = NutchConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+ try {
+ for (int i = 1; i < argv.length; i++) {
+ if ("-includeAdultMaterial".equals(argv[i])) {
+ includeAdult = true;
+ } else if ("-subset".equals(argv[i])) {
+ subsetDenom = Integer.parseInt(argv[i + 1]);
+ i++;
+ } else if ("-topic".equals(argv[i])) {
+ topics.addElement(argv[i + 1]);
+ i++;
+ } else if ("-topicFile".equals(argv[i])) {
+ addTopicsFromFile(argv[i + 1], topics);
+ i++;
+ } else if ("-skew".equals(argv[i])) {
+ skew = Integer.parseInt(argv[i + 1]);
+ i++;
+ }
+ }
+
+ DmozParser parser = new DmozParser();
+
+ if (!topics.isEmpty()) {
+ String regExp = new String("^(");
+ int j = 0;
+ for (; j < topics.size() - 1; ++j) {
+ regExp = regExp.concat(topics.get(j));
+ regExp = regExp.concat("|");
+ }
+ regExp = regExp.concat(topics.get(j));
+ regExp = regExp.concat(").*");
+ LOG.info("Topic selection pattern = " + regExp);
+ topicPattern = Pattern.compile(regExp);
+ }
+
+ parser.parseDmozFile(new File(dmozFile), subsetDenom, includeAdult, skew,
+ topicPattern);
+
+ } finally {
+ fs.close();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java b/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java
new file mode 100644
index 0000000..b7c1805
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/FileDumper.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+//JDK imports
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileOutputStream;
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import com.google.common.base.Strings;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+//Commons imports
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.codec.digest.DigestUtils;
+
+//Hadoop
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.DumpFileUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.TableUtil;
+
+//Tika imports
+import org.apache.tika.Tika;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * The file dumper tool enables one to reverse generate the raw content from
+ * Nutch segment data directories.
+ * </p>
+ * <p>
+ * The tool has a number of immediate uses:
+ * <ol>
+ * <li>one can see what a page looked like at the time it was crawled</li>
+ * <li>one can see different media types acquired as part of the crawl</li>
+ * <li>it enables us to see webpages before we augment them with additional
+ * metadata, this can be handy for providing a provenance trail for your crawl
+ * data.</li>
+ * </ol>
+ * </p>
+ * <p>
+ * Upon successful completion the tool displays a very convenient JSON snippet
+ * detailing the mimetype classifications and the counts of documents which fall
+ * into those classifications. An example is as follows:
+ * </p>
+ *
+ * <pre>
+ * {@code
+ * INFO: File Types:
+ * TOTAL Stats:
+ * [
+ * {"mimeType":"application/xml","count":"19"}
+ * {"mimeType":"image/png","count":"47"}
+ * {"mimeType":"image/jpeg","count":"141"}
+ * {"mimeType":"image/vnd.microsoft.icon","count":"4"}
+ * {"mimeType":"text/plain","count":"89"}
+ * {"mimeType":"video/quicktime","count":"2"}
+ * {"mimeType":"image/gif","count":"63"}
+ * {"mimeType":"application/xhtml+xml","count":"1670"}
+ * {"mimeType":"application/octet-stream","count":"40"}
+ * {"mimeType":"text/html","count":"1863"}
+ * ]
+ *
+ * FILTER Stats:
+ * [
+ * {"mimeType":"image/png","count":"47"}
+ * {"mimeType":"image/jpeg","count":"141"}
+ * {"mimeType":"image/vnd.microsoft.icon","count":"4"}
+ * {"mimeType":"video/quicktime","count":"2"}
+ * {"mimeType":"image/gif","count":"63"}
+ * ]
+ * }
+ * </pre>
+ * <p>
+ * In the case above, the tool would have been run with the <b>-mimeType
+ * image/png image/jpeg image/vnd.microsoft.icon video/quicktime image/gif</b>
+ * flag and corresponding values activated.
+ *
+ */
+public class FileDumper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileDumper.class
+ .getName());
+
+ /**
+ * Dumps the reverse engineered raw content from the provided segment
+ * directories if a parent directory contains more than one segment, otherwise
+ * a single segment can be passed as an argument.
+ *
+ * @param outputDir
+ * the directory you wish to dump the raw content to. This directory
+ * will be created.
+ * @param segmentRootDir
+ * a directory containing one or more segments.
+ * @param mimeTypes
+ * an array of mime types we have to dump, all others will be
+ * filtered out.
+ * @param flatDir
+ * a boolean flag specifying whether the output directory should contain
+ * only files instead of using nested directories to prevent naming
+ * conflicts.
+ * @param mimeTypeStats
+ * a flag indicating whether mimetype stats should be displayed
+ * instead of dumping files.
+ * @throws Exception
+ */
+ public void dump(File outputDir, File segmentRootDir, String[] mimeTypes, boolean flatDir, boolean mimeTypeStats, boolean reverseURLDump)
+ throws Exception {
+ if (mimeTypes == null)
+ LOG.info("Accepting all mimetypes.");
+ // total file counts
+ Map<String, Integer> typeCounts = new HashMap<String, Integer>();
+ // filtered file counts
+ Map<String, Integer> filteredCounts = new HashMap<String, Integer>();
+ Configuration conf = NutchConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+ int fileCount = 0;
+ File[] segmentDirs = segmentRootDir.listFiles(new FileFilter() {
+
+ @Override
+ public boolean accept(File file) {
+ return file.canRead() && file.isDirectory();
+ }
+ });
+ if (segmentDirs == null) {
+ LOG.error("No segment directories found in ["
+ + segmentRootDir.getAbsolutePath() + "]");
+ return;
+ }
+
+ for (File segment : segmentDirs) {
+ LOG.info("Processing segment: [" + segment.getAbsolutePath() + "]");
+ DataOutputStream doutputStream = null;
+
+ File segmentDir = new File(segment.getAbsolutePath(), Content.DIR_NAME);
+ File[] partDirs = segmentDir.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(File file) {
+ return file.canRead() && file.isDirectory();
+ }
+ });
+
+ if (partDirs == null) {
+ LOG.warn("Skipping Corrupt Segment: [{}]", segment.getAbsolutePath());
+ continue;
+ }
+
+ for (File partDir : partDirs) {
+ try {
+ String segmentPath = partDir + "/data";
+ Path file = new Path(segmentPath);
+ if (!new File(file.toString()).exists()) {
+ LOG.warn("Skipping segment: [" + segmentPath
+ + "]: no data directory present");
+ continue;
+ }
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file));
+
+ Writable key = (Writable) reader.getKeyClass().newInstance();
+ Content content = null;
+
+ while (reader.next(key)) {
+ content = new Content();
+ reader.getCurrentValue(content);
+ String url = key.toString();
+ String baseName = FilenameUtils.getBaseName(url);
+ String extension = FilenameUtils.getExtension(url);
+ if (extension == null || (extension != null && extension.equals(""))) {
+ extension = "html";
+ }
+
+ String filename = baseName + "." + extension;
+ ByteArrayInputStream bas = null;
+ Boolean filter = false;
+ try {
+ bas = new ByteArrayInputStream(content.getContent());
+ String mimeType = new Tika().detect(content.getContent());
+ collectStats(typeCounts, mimeType);
+ if (mimeType != null) {
+ if (mimeTypes == null
+ || Arrays.asList(mimeTypes).contains(mimeType)) {
+ collectStats(filteredCounts, mimeType);
+ filter = true;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOG.warn("Tika is unable to detect type for: [" + url + "]");
+ } finally {
+ if (bas != null) {
+ try {
+ bas.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+
+ if (filter) {
+ if (!mimeTypeStats) {
+ String md5Ofurl = DumpFileUtil.getUrlMD5(url);
+
+ String fullDir = outputDir.getAbsolutePath();
+ if (!flatDir && !reverseURLDump) {
+ fullDir = DumpFileUtil.createTwoLevelsDirectory(fullDir, md5Ofurl);
+ }
+
+ if (!Strings.isNullOrEmpty(fullDir)) {
+ String outputFullPath;
+
+ if (reverseURLDump) {
+ String[] reversedURL = TableUtil.reverseUrl(url).split(":");
+ reversedURL[0] = reversedURL[0].replace('.', '/');
+
+ String reversedURLPath = reversedURL[0] + "/" + DigestUtils.sha256Hex(url).toUpperCase();
+ outputFullPath = String.format("%s/%s", fullDir, reversedURLPath);
+
+ // We'll drop the trailing file name and create the nested structure if it doesn't already exist.
+ String[] splitPath = outputFullPath.split("/");
+ File fullOutputDir = new File(org.apache.commons.lang3.StringUtils.join(Arrays.copyOf(splitPath, splitPath.length - 1), "/"));
+
+ if (!fullOutputDir.exists()) {
+ fullOutputDir.mkdirs();
+ }
+ } else {
+ outputFullPath = String.format("%s/%s", fullDir, DumpFileUtil.createFileName(md5Ofurl, baseName, extension));
+ }
+
+ File outputFile = new File(outputFullPath);
+
+ if (!outputFile.exists()) {
+ LOG.info("Writing: [" + outputFullPath + "]");
+
+ // Modified to prevent FileNotFoundException (Invalid Argument)
+ FileOutputStream output = null;
+ try {
+ output = new FileOutputStream(outputFile);
+ IOUtils.write(content.getContent(), output);
+ }
+ catch (Exception e) {
+ LOG.warn("Write Error: [" + outputFullPath + "]");
+ e.printStackTrace();
+ }
+ finally {
+ if (output != null) {
+ output.flush();
+ try {
+ output.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ fileCount++;
+ } else {
+ LOG.info("Skipping writing: [" + outputFullPath
+ + "]: file already exists");
+ }
+ }
+ }
+ }
+ }
+ reader.close();
+ } finally {
+ fs.close();
+ if (doutputStream != null) {
+ try {
+ doutputStream.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ }
+ }
+ LOG.info("Dumper File Stats: "
+ + DumpFileUtil.displayFileTypes(typeCounts, filteredCounts));
+
+ if (mimeTypeStats) {
+ System.out.println("Dumper File Stats: "
+ + DumpFileUtil.displayFileTypes(typeCounts, filteredCounts));
+ }
+ }
+
+ /**
+ * Main method for invoking this tool
+ *
+ * @param args
+ * 1) output directory (which will be created) to host the raw data
+ * and 2) a directory containing one or more segments.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ // boolean options
+ Option helpOpt = new Option("h", "help", false, "show this help message");
+ // argument options
+ @SuppressWarnings("static-access")
+ Option outputOpt = OptionBuilder
+ .withArgName("outputDir")
+ .hasArg()
+ .withDescription(
+ "output directory (which will be created) to host the raw data")
+ .create("outputDir");
+ @SuppressWarnings("static-access")
+ Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+ .withDescription("the segment(s) to use").create("segment");
+ @SuppressWarnings("static-access")
+ Option mimeOpt = OptionBuilder
+ .withArgName("mimetype")
+ .hasArgs()
+ .withDescription(
+ "an optional list of mimetypes to dump, excluding all others. Defaults to all.")
+ .create("mimetype");
+ @SuppressWarnings("static-access")
+ Option mimeStat = OptionBuilder
+ .withArgName("mimeStats")
+ .withDescription(
+ "only display mimetype stats for the segment(s) instead of dumping file.")
+ .create("mimeStats");
+ @SuppressWarnings("static-access")
+ Option dirStructureOpt = OptionBuilder
+ .withArgName("flatdir")
+ .withDescription(
+ "optionally specify that the output directory should only contain files.")
+ .create("flatdir");
+ @SuppressWarnings("static-access")
+ Option reverseURLOutput = OptionBuilder
+ .withArgName("reverseUrlDirs")
+ .withDescription(
+ "optionally specify to use reverse URL folders for output structure.")
+ .create("reverseUrlDirs");
+
+ // create the options
+ Options options = new Options();
+ options.addOption(helpOpt);
+ options.addOption(outputOpt);
+ options.addOption(segOpt);
+ options.addOption(mimeOpt);
+ options.addOption(mimeStat);
+ options.addOption(dirStructureOpt);
+ options.addOption(reverseURLOutput);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("outputDir")
+ || (!line.hasOption("segment"))) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("FileDumper", options, true);
+ return;
+ }
+
+ File outputDir = new File(line.getOptionValue("outputDir"));
+ File segmentRootDir = new File(line.getOptionValue("segment"));
+ String[] mimeTypes = line.getOptionValues("mimetype");
+ boolean flatDir = line.hasOption("flatdir");
+ boolean shouldDisplayStats = false;
+ if (line.hasOption("mimeStats"))
+ shouldDisplayStats = true;
+ boolean reverseURLDump = false;
+ if (line.hasOption("reverseUrlDirs"))
+ reverseURLDump = true;
+
+ if (!outputDir.exists()) {
+ LOG.warn("Output directory: [" + outputDir.getAbsolutePath()
+ + "]: does not exist, creating it.");
+ if (!shouldDisplayStats) {
+ if (!outputDir.mkdirs())
+ throw new Exception("Unable to create: ["
+ + outputDir.getAbsolutePath() + "]");
+ }
+ }
+
+ FileDumper dumper = new FileDumper();
+ dumper.dump(outputDir, segmentRootDir, mimeTypes, flatDir, shouldDisplayStats, reverseURLDump);
+ } catch (Exception e) {
+ LOG.error("FileDumper: " + StringUtils.stringifyException(e));
+ e.printStackTrace();
+ return;
+ }
+ }
+
+ private void collectStats(Map<String, Integer> typeCounts, String mimeType) {
+ typeCounts.put(mimeType,
+ typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java b/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java
new file mode 100644
index 0000000..138372f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/FreeGenerator.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.URLPartitioner;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * This tool generates fetchlists (segments to be fetched) from plain text files
+ * containing one URL per line. It's useful when arbitrary URL-s need to be
+ * fetched without adding them first to the CrawlDb, or during testing.
+ */
+public class FreeGenerator extends Configured implements Tool {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(FreeGenerator.class);
+
+ private static final String FILTER_KEY = "free.generator.filter";
+ private static final String NORMALIZE_KEY = "free.generator.normalize";
+
+ public static class FG extends MapReduceBase implements
+ Mapper<WritableComparable<?>, Text, Text, Generator.SelectorEntry>,
+ Reducer<Text, Generator.SelectorEntry, Text, CrawlDatum> {
+ private URLNormalizers normalizers = null;
+ private URLFilters filters = null;
+ private ScoringFilters scfilters;
+ private CrawlDatum datum = new CrawlDatum();
+ private Text url = new Text();
+ private int defaultInterval = 0;
+
+ @Override
+ public void configure(JobConf job) {
+ super.configure(job);
+ defaultInterval = job.getInt("db.fetch.interval.default", 0);
+ scfilters = new ScoringFilters(job);
+ if (job.getBoolean(FILTER_KEY, false)) {
+ filters = new URLFilters(job);
+ }
+ if (job.getBoolean(NORMALIZE_KEY, false)) {
+ normalizers = new URLNormalizers(job, URLNormalizers.SCOPE_INJECT);
+ }
+ }
+
+ Generator.SelectorEntry entry = new Generator.SelectorEntry();
+
+ public void map(WritableComparable<?> key, Text value,
+ OutputCollector<Text, Generator.SelectorEntry> output, Reporter reporter)
+ throws IOException {
+ // value is a line of text
+ String urlString = value.toString();
+ try {
+ if (normalizers != null) {
+ urlString = normalizers.normalize(urlString,
+ URLNormalizers.SCOPE_INJECT);
+ }
+ if (urlString != null && filters != null) {
+ urlString = filters.filter(urlString);
+ }
+ if (urlString != null) {
+ url.set(urlString);
+ scfilters.injectedScore(url, datum);
+ }
+ } catch (Exception e) {
+ LOG.warn("Error adding url '" + value.toString() + "', skipping: "
+ + StringUtils.stringifyException(e));
+ return;
+ }
+ if (urlString == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("- skipping " + value.toString());
+ }
+ return;
+ }
+ entry.datum = datum;
+ entry.url = url;
+ // https://issues.apache.org/jira/browse/NUTCH-1430
+ entry.datum.setFetchInterval(defaultInterval);
+ output.collect(url, entry);
+ }
+
+ public void reduce(Text key, Iterator<Generator.SelectorEntry> values,
+ OutputCollector<Text, CrawlDatum> output, Reporter reporter)
+ throws IOException {
+ // pick unique urls from values - discard the reduce key due to hash
+ // collisions
+ HashMap<Text, CrawlDatum> unique = new HashMap<Text, CrawlDatum>();
+ while (values.hasNext()) {
+ Generator.SelectorEntry entry = values.next();
+ unique.put(entry.url, entry.datum);
+ }
+ // output unique urls
+ for (Entry<Text, CrawlDatum> e : unique.entrySet()) {
+ output.collect(e.getKey(), e.getValue());
+ }
+ }
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err
+ .println("Usage: FreeGenerator <inputDir> <segmentsDir> [-filter] [-normalize]");
+ System.err
+ .println("\tinputDir\tinput directory containing one or more input files.");
+ System.err
+ .println("\t\tEach text file contains a list of URLs, one URL per line");
+ System.err
+ .println("\tsegmentsDir\toutput directory, where new segment will be created");
+ System.err.println("\t-filter\trun current URLFilters on input URLs");
+ System.err
+ .println("\t-normalize\trun current URLNormalizers on input URLs");
+ return -1;
+ }
+ boolean filter = false;
+ boolean normalize = false;
+ if (args.length > 2) {
+ for (int i = 2; i < args.length; i++) {
+ if (args[i].equals("-filter")) {
+ filter = true;
+ } else if (args[i].equals("-normalize")) {
+ normalize = true;
+ } else {
+ LOG.error("Unknown argument: " + args[i] + ", exiting ...");
+ return -1;
+ }
+ }
+ }
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("FreeGenerator: starting at " + sdf.format(start));
+
+ JobConf job = new NutchJob(getConf());
+ job.setBoolean(FILTER_KEY, filter);
+ job.setBoolean(NORMALIZE_KEY, normalize);
+ FileInputFormat.addInputPath(job, new Path(args[0]));
+ job.setInputFormat(TextInputFormat.class);
+ job.setMapperClass(FG.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Generator.SelectorEntry.class);
+ job.setPartitionerClass(URLPartitioner.class);
+ job.setReducerClass(FG.class);
+ String segName = Generator.generateSegmentName();
+ job.setNumReduceTasks(job.getNumMapTasks());
+ job.setOutputFormat(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(CrawlDatum.class);
+ job.setOutputKeyComparatorClass(Generator.HashComparator.class);
+ FileOutputFormat.setOutputPath(job, new Path(args[1], new Path(segName,
+ CrawlDatum.GENERATE_DIR_NAME)));
+ try {
+ JobClient.runJob(job);
+ } catch (Exception e) {
+ LOG.error("FAILED: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("FreeGenerator: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(), new FreeGenerator(),
+ args);
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java b/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java
new file mode 100644
index 0000000..2b1c63b
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/ResolveUrls.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.tools;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.net.InetAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.nutch.util.URLUtil;
+
+/**
+ * A simple tool that will spin up multiple threads to resolve urls to ip
+ * addresses. This can be used to verify that pages that are failing due to
+ * UnknownHostException during fetching are actually bad and are not failing due
+ * to a dns problem in fetching.
+ */
+public class ResolveUrls {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ResolveUrls.class);
+
+ private String urlsFile = null;
+ private int numThreads = 100;
+ private ExecutorService pool = null;
+ private static AtomicInteger numTotal = new AtomicInteger(0);
+ private static AtomicInteger numErrored = new AtomicInteger(0);
+ private static AtomicInteger numResolved = new AtomicInteger(0);
+ private static AtomicLong totalTime = new AtomicLong(0L);
+
+ /**
+ * A Thread which gets the ip address of a single host by name.
+ */
+ private static class ResolverThread extends Thread {
+
+ private String url = null;
+
+ public ResolverThread(String url) {
+ this.url = url;
+ }
+
+ public void run() {
+
+ numTotal.incrementAndGet();
+ String host = URLUtil.getHost(url);
+ long start = System.currentTimeMillis();
+ try {
+
+ // get the address by name and if no error is thrown then it
+ // is resolved successfully
+ InetAddress.getByName(host);
+ LOG.info("Resolved: " + host);
+ numResolved.incrementAndGet();
+ } catch (Exception uhe) {
+ LOG.info("Error Resolving: " + host);
+ numErrored.incrementAndGet();
+ }
+ long end = System.currentTimeMillis();
+ long total = (end - start);
+ totalTime.addAndGet(total);
+ LOG.info(", " + total + " millis");
+ }
+ }
+
+ /**
+ * Creates a thread pool for resolving urls. Reads in the url file on the
+ * local filesystem. For each url it attempts to resolve it keeping a total
+ * account of the number resolved, errored, and the amount of time.
+ */
+ public void resolveUrls() {
+
+ try {
+
+ // create a thread pool with a fixed number of threads
+ pool = Executors.newFixedThreadPool(numThreads);
+
+ // read in the urls file and loop through each line, one url per line
+ BufferedReader buffRead = new BufferedReader(new FileReader(new File(
+ urlsFile)));
+ String urlStr = null;
+ while ((urlStr = buffRead.readLine()) != null) {
+
+ // spin up a resolver thread per url
+ LOG.info("Starting: " + urlStr);
+ pool.execute(new ResolverThread(urlStr));
+ }
+
+ // close the file and wait for up to 60 seconds before shutting down
+ // the thread pool to give urls time to finish resolving
+ buffRead.close();
+ pool.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (Exception e) {
+
+ // on error shutdown the thread pool immediately
+ pool.shutdownNow();
+ LOG.info(StringUtils.stringifyException(e));
+ }
+
+ // shutdown the thread pool and log totals
+ pool.shutdown();
+ LOG.info("Total: " + numTotal.get() + ", Resovled: " + numResolved.get()
+ + ", Errored: " + numErrored.get() + ", Average Time: "
+ + totalTime.get() / numTotal.get());
+ }
+
+ /**
+ * Create a new ResolveUrls with a file from the local file system.
+ *
+ * @param urlsFile
+ * The local urls file, one url per line.
+ */
+ public ResolveUrls(String urlsFile) {
+ this(urlsFile, 100);
+ }
+
+ /**
+ * Create a new ResolveUrls with a urls file and a number of threads for the
+ * Thread pool. Number of threads is 100 by default.
+ *
+ * @param urlsFile
+ * The local urls file, one url per line.
+ * @param numThreads
+ * The number of threads used to resolve urls in parallel.
+ */
+ public ResolveUrls(String urlsFile, int numThreads) {
+ this.urlsFile = urlsFile;
+ this.numThreads = numThreads;
+ }
+
+ /**
+ * Runs the resolve urls tool.
+ */
+ public static void main(String[] args) {
+
+ Options options = new Options();
+ OptionBuilder.withArgName("help");
+ OptionBuilder.withDescription("show this help message");
+ Option helpOpts = OptionBuilder.create("help");
+ options.addOption(helpOpts);
+
+ OptionBuilder.withArgName("urls");
+ OptionBuilder.hasArg();
+ OptionBuilder.withDescription("the urls file to check");
+ Option urlOpts = OptionBuilder.create("urls");
+ options.addOption(urlOpts);
+
+ OptionBuilder.withArgName("numThreads");
+ OptionBuilder.hasArgs();
+ OptionBuilder.withDescription("the number of threads to use");
+ Option numThreadOpts = OptionBuilder.create("numThreads");
+ options.addOption(numThreadOpts);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+ // parse out common line arguments
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("urls")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("ResolveUrls", options);
+ return;
+ }
+
+ // get the urls and the number of threads and start the resolver
+ String urls = line.getOptionValue("urls");
+ int numThreads = 100;
+ String numThreadsStr = line.getOptionValue("numThreads");
+ if (numThreadsStr != null) {
+ numThreads = Integer.parseInt(numThreadsStr);
+ }
+ ResolveUrls resolve = new ResolveUrls(urls, numThreads);
+ resolve.resolveUrls();
+ } catch (Exception e) {
+ LOG.error("ResolveUrls: " + StringUtils.stringifyException(e));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java b/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java
new file mode 100644
index 0000000..d8ae0b3
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/WARCUtils.java
@@ -0,0 +1,154 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.util.StringUtil;
+import org.archive.format.http.HttpHeaders;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class WARCUtils {
+ public final static String SOFTWARE = "software";
+ public final static String HTTP_HEADER_FROM = "http-header-from";
+ public final static String HTTP_HEADER_USER_AGENT = "http-header-user-agent";
+ public final static String HOSTNAME = "hostname";
+ public final static String ROBOTS = "robots";
+ public final static String OPERATOR = "operator";
+ public final static String FORMAT = "format";
+ public final static String CONFORMS_TO = "conformsTo";
+ public final static String IP = "ip";
+ public final static UUIDGenerator generator = new UUIDGenerator();
+
+ public static final ANVLRecord getWARCInfoContent(Configuration conf) {
+ ANVLRecord record = new ANVLRecord();
+
+ // informative headers
+ record.addLabelValue(FORMAT, "WARC File Format 1.0");
+ record.addLabelValue(CONFORMS_TO, "http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf");
+
+ record.addLabelValue(SOFTWARE, conf.get("http.agent.name", ""));
+ record.addLabelValue(HTTP_HEADER_USER_AGENT,
+ getAgentString(conf.get("http.agent.name", ""),
+ conf.get("http.agent.version", ""),
+ conf.get("http.agent.description", ""),
+ conf.get("http.agent.url", ""),
+ conf.get("http.agent.email", "")));
+ record.addLabelValue(HTTP_HEADER_FROM,
+ conf.get("http.agent.email", ""));
+
+ try {
+ record.addLabelValue(HOSTNAME, getHostname(conf));
+ record.addLabelValue(IP, getIPAddress(conf));
+ } catch (UnknownHostException ignored) {
+ // do nothing as this fields are optional
+ }
+
+ record.addLabelValue(ROBOTS, "classic"); // TODO Make configurable?
+ record.addLabelValue(OPERATOR, conf.get("http.agent.email", ""));
+
+ return record;
+ }
+
+ public static final String getHostname(Configuration conf)
+ throws UnknownHostException {
+
+ return StringUtil.isEmpty(conf.get("http.agent.host", "")) ?
+ InetAddress.getLocalHost().getHostName() :
+ conf.get("http.agent.host");
+ }
+
+ public static final String getIPAddress(Configuration conf)
+ throws UnknownHostException {
+
+ return InetAddress.getLocalHost().getHostAddress();
+ }
+
+ public static final byte[] toByteArray(HttpHeaders headers)
+ throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ headers.write(out);
+
+ return out.toByteArray();
+ }
+
+ public static final String getAgentString(String name, String version,
+ String description, String URL, String email) {
+
+ StringBuffer buf = new StringBuffer();
+
+ buf.append(name);
+
+ if (version != null) {
+ buf.append("/").append(version);
+ }
+
+ if (((description != null) && (description.length() != 0)) || (
+ (email != null) && (email.length() != 0)) || ((URL != null) && (
+ URL.length() != 0))) {
+ buf.append(" (");
+
+ if ((description != null) && (description.length() != 0)) {
+ buf.append(description);
+ if ((URL != null) || (email != null))
+ buf.append("; ");
+ }
+
+ if ((URL != null) && (URL.length() != 0)) {
+ buf.append(URL);
+ if (email != null)
+ buf.append("; ");
+ }
+
+ if ((email != null) && (email.length() != 0))
+ buf.append(email);
+
+ buf.append(")");
+ }
+
+ return buf.toString();
+ }
+
+ public static final WARCRecordInfo docToMetadata(NutchDocument doc)
+ throws UnsupportedEncodingException {
+ WARCRecordInfo record = new WARCRecordInfo();
+
+ record.setType(WARCConstants.WARCRecordType.metadata);
+ record.setUrl((String) doc.getFieldValue("id"));
+ record.setCreate14DigitDate(
+ DateUtils.get14DigitDate((Date) doc.getFieldValue("tstamp")));
+ record.setMimetype("application/warc-fields");
+ record.setRecordId(generator.getRecordID());
+
+ // metadata
+ ANVLRecord metadata = new ANVLRecord();
+
+ for (String field : doc.getFieldNames()) {
+ List<Object> values = doc.getField(field).getValues();
+ for (Object value : values) {
+ if (value instanceof Date) {
+ metadata.addLabelValue(field, DateUtils.get14DigitDate());
+ } else {
+ metadata.addLabelValue(field, (String) value);
+ }
+ }
+ }
+
+ record.setContentLength(metadata.getLength());
+ record.setContentStream(
+ new ByteArrayInputStream(metadata.getUTF8Bytes()));
+
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java
new file mode 100644
index 0000000..0eb7bf6
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcInputFormat.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.tools.arc;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A input format the reads arc files.
+ */
+public class ArcInputFormat extends FileInputFormat<Text, BytesWritable> {
+
+ /**
+ * Returns the <code>RecordReader</code> for reading the arc file.
+ *
+ * @param split
+ * The InputSplit of the arc file to process.
+ * @param job
+ * The job configuration.
+ * @param reporter
+ * The progress reporter.
+ */
+ public RecordReader<Text, BytesWritable> getRecordReader(InputSplit split,
+ JobConf job, Reporter reporter) throws IOException {
+ reporter.setStatus(split.toString());
+ return new ArcRecordReader(job, (FileSplit) split);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java
new file mode 100644
index 0000000..e9ff58d
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcRecordReader.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.tools.arc;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * <p>
+ * The <code>ArchRecordReader</code> class provides a record reader which reads
+ * records from arc files.
+ * </p>
+ *
+ * <p>
+ * Arc files are essentially tars of gzips. Each record in an arc file is a
+ * compressed gzip. Multiple records are concatenated together to form a
+ * complete arc. For more information on the arc file format see {@link http
+ * ://www.archive.org/web/researcher/ArcFileFormat.php } .
+ * </p>
+ *
+ * <p>
+ * Arc files are used by the internet archive and grub projects.
+ * </p>
+ *
+ * see {@link http://www.archive.org/ } see {@link http://www.grub.org/ }
+ */
+public class ArcRecordReader implements RecordReader<Text, BytesWritable> {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(ArcRecordReader.class);
+
+ protected Configuration conf;
+ protected long splitStart = 0;
+ protected long pos = 0;
+ protected long splitEnd = 0;
+ protected long splitLen = 0;
+ protected long fileLen = 0;
+ protected FSDataInputStream in;
+
+ private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };
+
+ /**
+ * <p>
+ * Returns true if the byte array passed matches the gzip header magic number.
+ * </p>
+ *
+ * @param input
+ * The byte array to check.
+ *
+ * @return True if the byte array matches the gzip header magic number.
+ */
+ public static boolean isMagic(byte[] input) {
+
+ // check for null and incorrect length
+ if (input == null || input.length != MAGIC.length) {
+ return false;
+ }
+
+ // check byte by byte
+ for (int i = 0; i < MAGIC.length; i++) {
+ if (MAGIC[i] != input[i]) {
+ return false;
+ }
+ }
+
+ // must match
+ return true;
+ }
+
+ /**
+ * Constructor that sets the configuration and file split.
+ *
+ * @param conf
+ * The job configuration.
+ * @param split
+ * The file split to read from.
+ *
+ * @throws IOException
+ * If an IO error occurs while initializing file split.
+ */
+ public ArcRecordReader(Configuration conf, FileSplit split)
+ throws IOException {
+
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ fileLen = fs.getFileStatus(split.getPath()).getLen();
+ this.conf = conf;
+ this.in = fs.open(split.getPath());
+ this.splitStart = split.getStart();
+ this.splitEnd = splitStart + split.getLength();
+ this.splitLen = split.getLength();
+ in.seek(splitStart);
+ }
+
+ /**
+ * Closes the record reader resources.
+ */
+ public void close() throws IOException {
+ this.in.close();
+ }
+
+ /**
+ * Creates a new instance of the <code>Text</code> object for the key.
+ */
+ public Text createKey() {
+ return ReflectionUtils.newInstance(Text.class, conf);
+ }
+
+ /**
+ * Creates a new instance of the <code>BytesWritable</code> object for the key
+ */
+ public BytesWritable createValue() {
+ return ReflectionUtils.newInstance(BytesWritable.class, conf);
+ }
+
+ /**
+ * Returns the current position in the file.
+ *
+ * @return The long of the current position in the file.
+ */
+ public long getPos() throws IOException {
+ return in.getPos();
+ }
+
+ /**
+ * Returns the percentage of progress in processing the file. This will be
+ * represented as a float from 0 to 1 with 1 being 100% completed.
+ *
+ * @return The percentage of progress as a float from 0 to 1.
+ */
+ public float getProgress() throws IOException {
+
+ // if we haven't even started
+ if (splitEnd == splitStart) {
+ return 0.0f;
+ } else {
+ // the progress is current pos - where we started / length of the split
+ return Math.min(1.0f, (getPos() - splitStart) / (float) splitLen);
+ }
+ }
+
+ /**
+ * <p>
+ * Returns true if the next record in the split is read into the key and value
+ * pair. The key will be the arc record header and the values will be the raw
+ * content bytes of the arc record.
+ * </p>
+ *
+ * @param key
+ * The record key
+ * @param value
+ * The record value
+ *
+ * @return True if the next record is read.
+ *
+ * @throws IOException
+ * If an error occurs while reading the record value.
+ */
+ public boolean next(Text key, BytesWritable value) throws IOException {
+
+ try {
+
+ // get the starting position on the input stream
+ long startRead = in.getPos();
+ byte[] magicBuffer = null;
+
+ // we need this loop to handle false positives in reading of gzip records
+ while (true) {
+
+ // while we haven't passed the end of the split
+ if (startRead >= splitEnd) {
+ return false;
+ }
+
+ // scanning for the gzip header
+ boolean foundStart = false;
+ while (!foundStart) {
+
+ // start at the current file position and scan for 1K at time, break
+ // if there is no more to read
+ startRead = in.getPos();
+ magicBuffer = new byte[1024];
+ int read = in.read(magicBuffer);
+ if (read < 0) {
+ break;
+ }
+
+ // scan the byte array for the gzip header magic number. This happens
+ // byte by byte
+ for (int i = 0; i < read - 1; i++) {
+ byte[] testMagic = new byte[2];
+ System.arraycopy(magicBuffer, i, testMagic, 0, 2);
+ if (isMagic(testMagic)) {
+ // set the next start to the current gzip header
+ startRead += i;
+ foundStart = true;
+ break;
+ }
+ }
+ }
+
+ // seek to the start of the gzip header
+ in.seek(startRead);
+ ByteArrayOutputStream baos = null;
+ int totalRead = 0;
+
+ try {
+
+ // read 4K of the gzip at a time putting into a byte array
+ byte[] buffer = new byte[4096];
+ GZIPInputStream zin = new GZIPInputStream(in);
+ int gzipRead = -1;
+ baos = new ByteArrayOutputStream();
+ while ((gzipRead = zin.read(buffer, 0, buffer.length)) != -1) {
+ baos.write(buffer, 0, gzipRead);
+ totalRead += gzipRead;
+ }
+ } catch (Exception e) {
+
+ // there are times we get false positives where the gzip header exists
+ // but it is not an actual gzip record, so we ignore it and start
+ // over seeking
+ System.out.println("Ignoring position: " + (startRead));
+ if (startRead + 1 < fileLen) {
+ in.seek(startRead + 1);
+ }
+ continue;
+ }
+
+ // change the output stream to a byte array
+ byte[] content = baos.toByteArray();
+
+ // the first line of the raw content in arc files is the header
+ int eol = 0;
+ for (int i = 0; i < content.length; i++) {
+ if (i > 0 && content[i] == '\n') {
+ eol = i;
+ break;
+ }
+ }
+
+ // create the header and the raw content minus the header
+ String header = new String(content, 0, eol).trim();
+ byte[] raw = new byte[(content.length - eol) - 1];
+ System.arraycopy(content, eol + 1, raw, 0, raw.length);
+
+ // populate key and values with the header and raw content.
+ Text keyText = key;
+ keyText.set(header);
+ BytesWritable valueBytes = value;
+ valueBytes.set(raw, 0, raw.length);
+
+ // TODO: It would be best to start at the end of the gzip read but
+ // the bytes read in gzip don't match raw bytes in the file so we
+ // overshoot the next header. With this current method you get
+ // some false positives but don't miss records.
+ if (startRead + 1 < fileLen) {
+ in.seek(startRead + 1);
+ }
+
+ // populated the record, now return
+ return true;
+ }
+ } catch (Exception e) {
+ LOG.equals(StringUtils.stringifyException(e));
+ }
+
+ // couldn't populate the record or there is no next record to read
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
new file mode 100644
index 0000000..39b8d95
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/ArcSegmentCreator.java
@@ -0,0 +1,426 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.tools.arc;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map.Entry;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.crawl.SignatureFactory;
+import org.apache.nutch.fetcher.FetcherOutputFormat;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.Parse;
+import org.apache.nutch.parse.ParseImpl;
+import org.apache.nutch.parse.ParseResult;
+import org.apache.nutch.parse.ParseStatus;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.parse.ParseUtil;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.protocol.ProtocolStatus;
+import org.apache.nutch.scoring.ScoringFilters;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.StringUtil;
+import org.apache.nutch.util.TimingUtil;
+
+/**
+ * <p>
+ * The <code>ArcSegmentCreator</code> is a replacement for fetcher that will
+ * take arc files as input and produce a nutch segment as output.
+ * </p>
+ *
+ * <p>
+ * Arc files are tars of compressed gzips which are produced by both the
+ * internet archive project and the grub distributed crawler project.
+ * </p>
+ *
+ */
+public class ArcSegmentCreator extends Configured implements Tool,
+ Mapper<Text, BytesWritable, Text, NutchWritable> {
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(ArcSegmentCreator.class);
+ public static final String URL_VERSION = "arc.url.version";
+ private JobConf jobConf;
+ private URLFilters urlFilters;
+ private ScoringFilters scfilters;
+ private ParseUtil parseUtil;
+ private URLNormalizers normalizers;
+ private int interval;
+
+ private static SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
+
+ public ArcSegmentCreator() {
+
+ }
+
+ /**
+ * <p>
+ * Constructor that sets the job configuration.
+ * </p>
+ *
+ * @param conf
+ */
+ public ArcSegmentCreator(Configuration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Generates a random name for the segments.
+ *
+ * @return The generated segment name.
+ */
+ public static synchronized String generateSegmentName() {
+ try {
+ Thread.sleep(1000);
+ } catch (Throwable t) {
+ }
+ return sdf.format(new Date(System.currentTimeMillis()));
+ }
+
+ /**
+ * <p>
+ * Configures the job. Sets the url filters, scoring filters, url normalizers
+ * and other relevant data.
+ * </p>
+ *
+ * @param job
+ * The job configuration.
+ */
+ public void configure(JobConf job) {
+
+ // set the url filters, scoring filters the parse util and the url
+ // normalizers
+ this.jobConf = job;
+ this.urlFilters = new URLFilters(jobConf);
+ this.scfilters = new ScoringFilters(jobConf);
+ this.parseUtil = new ParseUtil(jobConf);
+ this.normalizers = new URLNormalizers(jobConf, URLNormalizers.SCOPE_FETCHER);
+ interval = jobConf.getInt("db.fetch.interval.default", 2592000);
+ }
+
+ public void close() {
+ }
+
+ /**
+ * <p>
+ * Parses the raw content of a single record to create output. This method is
+ * almost the same as the {@link org.apache.nutch.Fetcher#output} method in
+ * terms of processing and output.
+ *
+ * @param output
+ * The job output collector.
+ * @param segmentName
+ * The name of the segment to create.
+ * @param key
+ * The url of the record.
+ * @param datum
+ * The CrawlDatum of the record.
+ * @param content
+ * The raw content of the record
+ * @param pstatus
+ * The protocol status
+ * @param status
+ * The fetch status.
+ *
+ * @return The result of the parse in a ParseStatus object.
+ */
+ private ParseStatus output(OutputCollector<Text, NutchWritable> output,
+ String segmentName, Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status) {
+
+ // set the fetch status and the fetch time
+ datum.setStatus(status);
+ datum.setFetchTime(System.currentTimeMillis());
+ if (pstatus != null)
+ datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+
+ ParseResult parseResult = null;
+ if (content != null) {
+ Metadata metadata = content.getMetadata();
+ // add segment to metadata
+ metadata.set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ // add score to content metadata so that ParseSegment can pick it up.
+ try {
+ scfilters.passScoreBeforeParsing(key, datum, content);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+
+ try {
+
+ // parse the content
+ parseResult = this.parseUtil.parse(content);
+ } catch (Exception e) {
+ LOG.warn("Error parsing: " + key + ": "
+ + StringUtils.stringifyException(e));
+ }
+
+ // set the content signature
+ if (parseResult == null) {
+ byte[] signature = SignatureFactory.getSignature(getConf()).calculate(
+ content, new ParseStatus().getEmptyParse(getConf()));
+ datum.setSignature(signature);
+ }
+
+ try {
+ output.collect(key, new NutchWritable(datum));
+ output.collect(key, new NutchWritable(content));
+
+ if (parseResult != null) {
+ for (Entry<Text, Parse> entry : parseResult) {
+ Text url = entry.getKey();
+ Parse parse = entry.getValue();
+ ParseStatus parseStatus = parse.getData().getStatus();
+
+ if (!parseStatus.isSuccess()) {
+ LOG.warn("Error parsing: " + key + ": " + parseStatus);
+ parse = parseStatus.getEmptyParse(getConf());
+ }
+
+ // Calculate page signature.
+ byte[] signature = SignatureFactory.getSignature(getConf())
+ .calculate(content, parse);
+ // Ensure segment name and score are in parseData metadata
+ parse.getData().getContentMeta()
+ .set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ parse.getData().getContentMeta()
+ .set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
+ // Pass fetch time to content meta
+ parse.getData().getContentMeta()
+ .set(Nutch.FETCH_TIME_KEY, Long.toString(datum.getFetchTime()));
+ if (url.equals(key))
+ datum.setSignature(signature);
+ try {
+ scfilters.passScoreAfterParsing(url, content, parse);
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
+ }
+ }
+ output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+ parse.getText()), parse.getData(), parse.isCanonical())));
+ }
+ }
+ } catch (IOException e) {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("ArcSegmentCreator caught:"
+ + StringUtils.stringifyException(e));
+ }
+ }
+
+ // return parse status if it exits
+ if (parseResult != null && !parseResult.isEmpty()) {
+ Parse p = parseResult.get(content.getUrl());
+ if (p != null) {
+ return p.getData().getStatus();
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * <p>
+ * Logs any error that occurs during conversion.
+ * </p>
+ *
+ * @param url
+ * The url we are parsing.
+ * @param t
+ * The error that occured.
+ */
+ private void logError(Text url, Throwable t) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Conversion of " + url + " failed with: "
+ + StringUtils.stringifyException(t));
+ }
+ }
+
+ /**
+ * <p>
+ * Runs the Map job to translate an arc record into output for Nutch segments.
+ * </p>
+ *
+ * @param key
+ * The arc record header.
+ * @param bytes
+ * The arc record raw content bytes.
+ * @param output
+ * The output collecter.
+ * @param reporter
+ * The progress reporter.
+ */
+ public void map(Text key, BytesWritable bytes,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
+
+ String[] headers = key.toString().split("\\s+");
+ String urlStr = headers[0];
+ String version = headers[2];
+ String contentType = headers[3];
+
+ // arcs start with a file description. for now we ignore this as it is not
+ // a content record
+ if (urlStr.startsWith("filedesc://")) {
+ LOG.info("Ignoring file header: " + urlStr);
+ return;
+ }
+ LOG.info("Processing: " + urlStr);
+
+ // get the raw bytes from the arc file, create a new crawldatum
+ Text url = new Text();
+ CrawlDatum datum = new CrawlDatum(CrawlDatum.STATUS_DB_FETCHED, interval,
+ 1.0f);
+ String segmentName = getConf().get(Nutch.SEGMENT_NAME_KEY);
+
+ // normalize and filter the urls
+ try {
+ urlStr = normalizers.normalize(urlStr, URLNormalizers.SCOPE_FETCHER);
+ urlStr = urlFilters.filter(urlStr); // filter the url
+ } catch (Exception e) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Skipping " + url + ":" + e);
+ }
+ urlStr = null;
+ }
+
+ // if still a good url then process
+ if (urlStr != null) {
+
+ url.set(urlStr);
+ try {
+
+ // set the protocol status to success and the crawl status to success
+ // create the content from the normalized url and the raw bytes from
+ // the arc file, TODO: currently this doesn't handle text of errors
+ // pages (i.e. 404, etc.). We assume we won't get those.
+ ProtocolStatus status = ProtocolStatus.STATUS_SUCCESS;
+ Content content = new Content(urlStr, urlStr, bytes.getBytes(),
+ contentType, new Metadata(), getConf());
+
+ // set the url version into the metadata
+ content.getMetadata().set(URL_VERSION, version);
+ ParseStatus pstatus = null;
+ pstatus = output(output, segmentName, url, datum, content, status,
+ CrawlDatum.STATUS_FETCH_SUCCESS);
+ reporter.progress();
+ } catch (Throwable t) { // unexpected exception
+ logError(url, t);
+ output(output, segmentName, url, datum, null, null,
+ CrawlDatum.STATUS_FETCH_RETRY);
+ }
+ }
+ }
+
+ /**
+ * <p>
+ * Creates the arc files to segments job.
+ * </p>
+ *
+ * @param arcFiles
+ * The path to the directory holding the arc files
+ * @param segmentsOutDir
+ * The output directory for writing the segments
+ *
+ * @throws IOException
+ * If an IO error occurs while running the job.
+ */
+ public void createSegments(Path arcFiles, Path segmentsOutDir)
+ throws IOException {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("ArcSegmentCreator: starting at " + sdf.format(start));
+ LOG.info("ArcSegmentCreator: arc files dir: " + arcFiles);
+ }
+
+ JobConf job = new NutchJob(getConf());
+ job.setJobName("ArcSegmentCreator " + arcFiles);
+ String segName = generateSegmentName();
+ job.set(Nutch.SEGMENT_NAME_KEY, segName);
+ FileInputFormat.addInputPath(job, arcFiles);
+ job.setInputFormat(ArcInputFormat.class);
+ job.setMapperClass(ArcSegmentCreator.class);
+ FileOutputFormat.setOutputPath(job, new Path(segmentsOutDir, segName));
+ job.setOutputFormat(FetcherOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NutchWritable.class);
+
+ JobClient.runJob(job);
+
+ long end = System.currentTimeMillis();
+ LOG.info("ArcSegmentCreator: finished at " + sdf.format(end)
+ + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ }
+
+ public static void main(String args[]) throws Exception {
+ int res = ToolRunner.run(NutchConfiguration.create(),
+ new ArcSegmentCreator(), args);
+ System.exit(res);
+ }
+
+ public int run(String[] args) throws Exception {
+
+ String usage = "Usage: ArcSegmentCreator <arcFiles> <segmentsOutDir>";
+
+ if (args.length < 2) {
+ System.err.println(usage);
+ return -1;
+ }
+
+ // set the arc files directory and the segments output directory
+ Path arcFiles = new Path(args[0]);
+ Path segmentsOutDir = new Path(args[1]);
+
+ try {
+ // create the segments from the arc files
+ createSegments(arcFiles, segmentsOutDir);
+ return 0;
+ } catch (Exception e) {
+ LOG.error("ArcSegmentCreator: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java
new file mode 100644
index 0000000..cb6e115
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/arc/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Tools to read the
+ * <a href="http://archive.org/web/researcher/ArcFileFormat.php">Arc file format</a>.
+ */
+package org.apache.nutch.tools.arc;
+
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java
new file mode 100644
index 0000000..3b868c5
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Miscellaneous tools.
+ */
+package org.apache.nutch.tools;
+