You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:52 UTC
[36/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
new file mode 100644
index 0000000..1b425c4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.util.List;
+
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ibm.icu.text.SimpleDateFormat;
+
+/**
+ * Abstract class that implements {@see CommonCrawlFormat} interface.
+ *
+ */
+public abstract class AbstractCommonCrawlFormat implements CommonCrawlFormat {
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName());
+
+ protected String url;
+
+ protected Content content;
+
+ protected Metadata metadata;
+
+ protected Configuration conf;
+
+ protected String keyPrefix;
+
+ protected boolean simpleDateFormat;
+
+ protected boolean jsonArray;
+
+ protected boolean reverseKey;
+
+ protected String reverseKeyValue;
+
+ protected List<String> inLinks;
+
+ public AbstractCommonCrawlFormat(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ this.url = url;
+ this.content = content;
+ this.metadata = metadata;
+ this.conf = nutchConf;
+
+ this.keyPrefix = config.getKeyPrefix();
+ this.simpleDateFormat = config.getSimpleDateFormat();
+ this.jsonArray = config.getJsonArray();
+ this.reverseKey = config.getReverseKey();
+ this.reverseKeyValue = config.getReverseKeyValue();
+ }
+
+ public String getJsonData(String url, Content content, Metadata metadata)
+ throws IOException {
+ this.url = url;
+ this.content = content;
+ this.metadata = metadata;
+
+ return this.getJsonData();
+ }
+
+ public String getJsonData(String url, Content content, Metadata metadata,
+ ParseData parseData) throws IOException {
+
+ // override of this is required in the actual formats
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public String getJsonData() throws IOException {
+ try {
+ startObject(null);
+
+ // url
+ writeKeyValue("url", getUrl());
+
+ // timestamp
+ writeKeyValue("timestamp", getTimestamp());
+
+ // request
+ startObject("request");
+ writeKeyValue("method", getMethod());
+ startObject("client");
+ writeKeyValue("hostname", getRequestHostName());
+ writeKeyValue("address", getRequestHostAddress());
+ writeKeyValue("software", getRequestSoftware());
+ writeKeyValue("robots", getRequestRobots());
+ startObject("contact");
+ writeKeyValue("name", getRequestContactName());
+ writeKeyValue("email", getRequestContactEmail());
+ closeObject("contact");
+ closeObject("client");
+ // start request headers
+ startHeaders("headers", false, true);
+ writeKeyValueWrapper("Accept", getRequestAccept());
+ writeKeyValueWrapper("Accept-Encoding", getRequestAcceptEncoding());
+ writeKeyValueWrapper("Accept-Language", getRequestAcceptLanguage());
+ writeKeyValueWrapper("User-Agent", getRequestUserAgent());
+ //closeObject("headers");
+ closeHeaders("headers", false, true);
+ writeKeyNull("body");
+ closeObject("request");
+
+ // response
+ startObject("response");
+ writeKeyValue("status", getResponseStatus());
+ startObject("server");
+ writeKeyValue("hostname", getResponseHostName());
+ writeKeyValue("address", getResponseAddress());
+ closeObject("server");
+ // start response headers
+ startHeaders("headers", false, true);
+ writeKeyValueWrapper("Content-Encoding", getResponseContentEncoding());
+ writeKeyValueWrapper("Content-Type", getResponseContentType());
+ writeKeyValueWrapper("Date", getResponseDate());
+ writeKeyValueWrapper("Server", getResponseServer());
+ for (String name : metadata.names()) {
+ if (name.equalsIgnoreCase("Content-Encoding") || name.equalsIgnoreCase("Content-Type") || name.equalsIgnoreCase("Date") || name.equalsIgnoreCase("Server")) {
+ continue;
+ }
+ writeKeyValueWrapper(name, metadata.get(name));
+ }
+ closeHeaders("headers", false, true);
+ writeKeyValue("body", getResponseContent());
+ closeObject("response");
+
+ // key
+ if (!this.keyPrefix.isEmpty()) {
+ this.keyPrefix += "-";
+ }
+ writeKeyValue("key", this.keyPrefix + getKey());
+
+ // imported
+ writeKeyValue("imported", getImported());
+
+ if (getInLinks() != null){
+ startArray("inlinks", false, true);
+ for (String link : getInLinks()) {
+ writeArrayValue(link);
+ }
+ closeArray("inlinks", false, true);
+ }
+ closeObject(null);
+
+ return generateJson();
+
+ } catch (IOException ioe) {
+ LOG.warn("Error in processing file " + url + ": " + ioe.getMessage());
+ throw new IOException("Error in generating JSON:" + ioe.getMessage());
+ }
+ }
+
+ // abstract methods
+
+ protected abstract void writeKeyValue(String key, String value) throws IOException;
+
+ protected abstract void writeKeyNull(String key) throws IOException;
+
+ protected abstract void startArray(String key, boolean nested, boolean newline) throws IOException;
+
+ protected abstract void closeArray(String key, boolean nested, boolean newline) throws IOException;
+
+ protected abstract void writeArrayValue(String value) throws IOException;
+
+ protected abstract void startObject(String key) throws IOException;
+
+ protected abstract void closeObject(String key) throws IOException;
+
+ protected abstract String generateJson() throws IOException;
+
+ // getters
+
+ protected String getUrl() {
+ try {
+ return URIUtil.encodePath(url);
+ } catch (URIException e) {
+ LOG.error("Can't encode URL " + url);
+ }
+
+ return url;
+ }
+
+ protected String getTimestamp() {
+ if (this.simpleDateFormat) {
+ String timestamp = null;
+ try {
+ long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get(Metadata.LAST_MODIFIED))).getTime();
+ timestamp = String.valueOf(epoch);
+ } catch (ParseException pe) {
+ LOG.warn(pe.getMessage());
+ }
+ return timestamp;
+ } else {
+ return ifNullString(metadata.get(Metadata.LAST_MODIFIED));
+ }
+ }
+
+ protected String getMethod() {
+ return new String("GET");
+ }
+
+ protected String getRequestHostName() {
+ String hostName = "";
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException uhe) {
+
+ }
+ return hostName;
+ }
+
+ protected String getRequestHostAddress() {
+ String hostAddress = "";
+ try {
+ hostAddress = InetAddress.getLocalHost().getHostAddress();
+ } catch (UnknownHostException uhe) {
+
+ }
+ return hostAddress;
+ }
+
+ protected String getRequestSoftware() {
+ return conf.get("http.agent.version", "");
+ }
+
+ protected String getRequestRobots() {
+ return new String("CLASSIC");
+ }
+
+ protected String getRequestContactName() {
+ return conf.get("http.agent.name", "");
+ }
+
+ protected String getRequestContactEmail() {
+ return conf.get("http.agent.email", "");
+ }
+
+ protected String getRequestAccept() {
+ return conf.get("http.accept", "");
+ }
+
+ protected String getRequestAcceptEncoding() {
+ return new String(""); // TODO
+ }
+
+ protected String getRequestAcceptLanguage() {
+ return conf.get("http.accept.language", "");
+ }
+
+ protected String getRequestUserAgent() {
+ return conf.get("http.robots.agents", "");
+ }
+
+ protected String getResponseStatus() {
+ return ifNullString(metadata.get("status"));
+ }
+
+ protected String getResponseHostName() {
+ return URLUtil.getHost(url);
+ }
+
+ protected String getResponseAddress() {
+ return ifNullString(metadata.get("_ip_"));
+ }
+
+ protected String getResponseContentEncoding() {
+ return ifNullString(metadata.get("Content-Encoding"));
+ }
+
+ protected String getResponseContentType() {
+ return ifNullString(metadata.get("Content-Type"));
+ }
+
+ public List<String> getInLinks() {
+ return inLinks;
+ }
+
+ public void setInLinks(List<String> inLinks) {
+ this.inLinks = inLinks;
+ }
+
+ protected String getResponseDate() {
+ if (this.simpleDateFormat) {
+ String timestamp = null;
+ try {
+ long epoch = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
+ timestamp = String.valueOf(epoch);
+ } catch (ParseException pe) {
+ LOG.warn(pe.getMessage());
+ }
+ return timestamp;
+ } else {
+ return ifNullString(metadata.get("Date"));
+ }
+ }
+
+ protected String getResponseServer() {
+ return ifNullString(metadata.get("Server"));
+ }
+
+ protected String getResponseContent() {
+ return new String(content.getContent());
+ }
+
+ protected String getKey() {
+ if (this.reverseKey) {
+ return this.reverseKeyValue;
+ }
+ else {
+ return url;
+ }
+ }
+
+ protected String getImported() {
+ if (this.simpleDateFormat) {
+ String timestamp = null;
+ try {
+ long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
+ timestamp = String.valueOf(epoch);
+ } catch (ParseException pe) {
+ LOG.warn(pe.getMessage());
+ }
+ return timestamp;
+ } else {
+ return ifNullString(metadata.get("Date"));
+ }
+ }
+
+ private static String ifNullString(String value) {
+ return (value != null) ? value : "";
+ }
+
+ private void startHeaders(String key, boolean nested, boolean newline) throws IOException {
+ if (this.jsonArray) {
+ startArray(key, nested, newline);
+ }
+ else {
+ startObject(key);
+ }
+ }
+
+ private void closeHeaders(String key, boolean nested, boolean newline) throws IOException {
+ if (this.jsonArray) {
+ closeArray(key, nested, newline);
+ }
+ else {
+ closeObject(key);
+ }
+ }
+
+ private void writeKeyValueWrapper(String key, String value) throws IOException {
+ if (this.jsonArray) {
+ startArray(null, true, false);
+ writeArrayValue(key);
+ writeArrayValue(value);
+ closeArray(null, true, false);
+ }
+ else {
+ writeKeyValue(key, value);
+ }
+ }
+
+ @Override
+ public void close() {}
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java b/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java
new file mode 100755
index 0000000..ba42745
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.CrawlDbReader;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class Benchmark extends Configured implements Tool {
+ private static final Log LOG = LogFactory.getLog(Benchmark.class);
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = NutchConfiguration.create();
+ int res = ToolRunner.run(conf, new Benchmark(), args);
+ System.exit(res);
+ }
+
+ @SuppressWarnings("unused")
+ private static String getDate() {
+ return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System
+ .currentTimeMillis()));
+ }
+
+ private void createSeeds(FileSystem fs, Path seedsDir, int count)
+ throws Exception {
+ OutputStream os = fs.create(new Path(seedsDir, "seeds"));
+ for (int i = 0; i < count; i++) {
+ String url = "http://www.test-" + i + ".com/\r\n";
+ os.write(url.getBytes());
+ }
+ os.flush();
+ os.close();
+ }
+
+ public static final class BenchmarkResults {
+ Map<String, Map<String, Long>> timings = new HashMap<String, Map<String, Long>>();
+ List<String> runs = new ArrayList<String>();
+ List<String> stages = new ArrayList<String>();
+ int seeds, depth, threads;
+ boolean delete;
+ long topN;
+ long elapsed;
+ String plugins;
+
+ public void addTiming(String stage, String run, long timing) {
+ if (!runs.contains(run)) {
+ runs.add(run);
+ }
+ if (!stages.contains(stage)) {
+ stages.add(stage);
+ }
+ Map<String, Long> t = timings.get(stage);
+ if (t == null) {
+ t = new HashMap<String, Long>();
+ timings.put(stage, t);
+ }
+ t.put(run, timing);
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("* Plugins:\t" + plugins + "\n");
+ sb.append("* Seeds:\t" + seeds + "\n");
+ sb.append("* Depth:\t" + depth + "\n");
+ sb.append("* Threads:\t" + threads + "\n");
+ sb.append("* TopN:\t" + topN + "\n");
+ sb.append("* Delete:\t" + delete + "\n");
+ sb.append("* TOTAL ELAPSED:\t" + elapsed + "\n");
+ for (String stage : stages) {
+ Map<String, Long> timing = timings.get(stage);
+ if (timing == null)
+ continue;
+ sb.append("- stage: " + stage + "\n");
+ for (String r : runs) {
+ Long Time = timing.get(r);
+ if (Time == null) {
+ continue;
+ }
+ sb.append("\trun " + r + "\t" + Time + "\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ public List<String> getStages() {
+ return stages;
+ }
+
+ public List<String> getRuns() {
+ return runs;
+ }
+ }
+
+ public int run(String[] args) throws Exception {
+ String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
+ int seeds = 1;
+ int depth = 10;
+ int threads = 10;
+ boolean delete = true;
+ long topN = Long.MAX_VALUE;
+
+ if (args.length == 0) {
+ System.err
+ .println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-keep] [-maxPerHost NN] [-plugins <regex>]");
+ System.err
+ .println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)");
+ System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)");
+ System.err
+ .println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)");
+ System.err
+ .println("\t-keep\tkeep segment data (default: delete after updatedb)");
+ System.err.println("\t-plugins <regex>\toverride 'plugin.includes'.");
+ System.err.println("\tNOTE: if not specified, this is reset to: "
+ + plugins);
+ System.err
+ .println("\tNOTE: if 'default' is specified then a value set in nutch-default/nutch-site is used.");
+ System.err
+ .println("\t-maxPerHost NN\tmax. # of URLs per host in a fetchlist");
+ return -1;
+ }
+ int maxPerHost = Integer.MAX_VALUE;
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-seeds")) {
+ seeds = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-threads")) {
+ threads = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-depth")) {
+ depth = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-keep")) {
+ delete = false;
+ } else if (args[i].equals("-plugins")) {
+ plugins = args[++i];
+ } else if (args[i].equalsIgnoreCase("-maxPerHost")) {
+ maxPerHost = Integer.parseInt(args[++i]);
+ } else {
+ LOG.fatal("Invalid argument: '" + args[i] + "'");
+ return -1;
+ }
+ }
+ BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN,
+ delete, plugins);
+ System.out.println(res);
+ return 0;
+ }
+
+ public BenchmarkResults benchmark(int seeds, int depth, int threads,
+ int maxPerHost, long topN, boolean delete, String plugins)
+ throws Exception {
+ Configuration conf = getConf();
+ conf.set("http.proxy.host", "localhost");
+ conf.setInt("http.proxy.port", 8181);
+ conf.set("http.agent.name", "test");
+ conf.set("http.robots.agents", "test,*");
+ if (!plugins.equals("default")) {
+ conf.set("plugin.includes", plugins);
+ }
+ conf.setInt(Generator.GENERATOR_MAX_COUNT, maxPerHost);
+ conf.set(Generator.GENERATOR_COUNT_MODE,
+ Generator.GENERATOR_COUNT_VALUE_HOST);
+ JobConf job = new NutchJob(getConf());
+ FileSystem fs = FileSystem.get(job);
+ Path dir = new Path(getConf().get("hadoop.tmp.dir"), "bench-"
+ + System.currentTimeMillis());
+ fs.mkdirs(dir);
+ Path rootUrlDir = new Path(dir, "seed");
+ fs.mkdirs(rootUrlDir);
+ createSeeds(fs, rootUrlDir, seeds);
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("crawl started in: " + dir);
+ LOG.info("rootUrlDir = " + rootUrlDir);
+ LOG.info("threads = " + threads);
+ LOG.info("depth = " + depth);
+ }
+ BenchmarkResults res = new BenchmarkResults();
+ res.delete = delete;
+ res.depth = depth;
+ res.plugins = plugins;
+ res.seeds = seeds;
+ res.threads = threads;
+ res.topN = topN;
+ Path crawlDb = new Path(dir + "/crawldb");
+ Path linkDb = new Path(dir + "/linkdb");
+ Path segments = new Path(dir + "/segments");
+ res.elapsed = System.currentTimeMillis();
+ Injector injector = new Injector(getConf());
+ Generator generator = new Generator(getConf());
+ Fetcher fetcher = new Fetcher(getConf());
+ ParseSegment parseSegment = new ParseSegment(getConf());
+ CrawlDb crawlDbTool = new CrawlDb(getConf());
+ LinkDb linkDbTool = new LinkDb(getConf());
+
+ // initialize crawlDb
+ long start = System.currentTimeMillis();
+ injector.inject(crawlDb, rootUrlDir);
+ long delta = System.currentTimeMillis() - start;
+ res.addTiming("inject", "0", delta);
+ int i;
+ for (i = 0; i < depth; i++) { // generate new segment
+ start = System.currentTimeMillis();
+ Path[] segs = generator.generate(crawlDb, segments, -1, topN,
+ System.currentTimeMillis());
+ delta = System.currentTimeMillis() - start;
+ res.addTiming("generate", i + "", delta);
+ if (segs == null) {
+ LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
+ break;
+ }
+ start = System.currentTimeMillis();
+ fetcher.fetch(segs[0], threads); // fetch it
+ delta = System.currentTimeMillis() - start;
+ res.addTiming("fetch", i + "", delta);
+ if (!Fetcher.isParsing(job)) {
+ start = System.currentTimeMillis();
+ parseSegment.parse(segs[0]); // parse it, if needed
+ delta = System.currentTimeMillis() - start;
+ res.addTiming("parse", i + "", delta);
+ }
+ start = System.currentTimeMillis();
+ crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
+ delta = System.currentTimeMillis() - start;
+ res.addTiming("update", i + "", delta);
+ start = System.currentTimeMillis();
+ linkDbTool.invert(linkDb, segs, true, true, false); // invert links
+ delta = System.currentTimeMillis() - start;
+ res.addTiming("invert", i + "", delta);
+ // delete data
+ if (delete) {
+ for (Path p : segs) {
+ fs.delete(p, true);
+ }
+ }
+ }
+ if (i == 0) {
+ LOG.warn("No URLs to fetch - check your seed list and URL filters.");
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("crawl finished: " + dir);
+ }
+ res.elapsed = System.currentTimeMillis() - res.elapsed;
+ CrawlDbReader dbreader = new CrawlDbReader();
+ dbreader.processStatJob(crawlDb.toString(), job, false);
+ return res;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java
new file mode 100644
index 0000000..d8c06c0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Properties;
+
+public class CommonCrawlConfig implements Serializable {
+
+ /**
+ * Serial version UID
+ */
+ private static final long serialVersionUID = 5235013733207799661L;
+
+ // Prefix for key value in the output format
+ private String keyPrefix = "";
+
+ private boolean simpleDateFormat = false;
+
+ private boolean jsonArray = false;
+
+ private boolean reverseKey = false;
+
+ private String reverseKeyValue = "";
+
+ private boolean compressed = false;
+
+ private long warcSize = 0;
+
+ private String outputDir;
+
+ /**
+ * Default constructor
+ */
+ public CommonCrawlConfig() {
+ // TODO init(this.getClass().getResourceAsStream("CommonCrawlConfig.properties"));
+ }
+
+ public CommonCrawlConfig(InputStream stream) {
+ init(stream);
+ }
+
+ private void init(InputStream stream) {
+ if (stream == null) {
+ return;
+ }
+ Properties properties = new Properties();
+
+ try {
+ properties.load(stream);
+ } catch (IOException e) {
+ // TODO
+ } finally {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ // TODO
+ }
+ }
+
+ setKeyPrefix(properties.getProperty("keyPrefix", ""));
+ setSimpleDateFormat(Boolean.parseBoolean(properties.getProperty("simpleDateFormat", "False")));
+ setJsonArray(Boolean.parseBoolean(properties.getProperty("jsonArray", "False")));
+ setReverseKey(Boolean.parseBoolean(properties.getProperty("reverseKey", "False")));
+ }
+
+ public void setKeyPrefix(String keyPrefix) {
+ this.keyPrefix = keyPrefix;
+ }
+
+ public void setSimpleDateFormat(boolean simpleDateFormat) {
+ this.simpleDateFormat = simpleDateFormat;
+ }
+
+ public void setJsonArray(boolean jsonArray) {
+ this.jsonArray = jsonArray;
+ }
+
+ public void setReverseKey(boolean reverseKey) {
+ this.reverseKey = reverseKey;
+ }
+
+ public void setReverseKeyValue(String reverseKeyValue) {
+ this.reverseKeyValue = reverseKeyValue;
+ }
+
+ public String getKeyPrefix() {
+ return this.keyPrefix;
+ }
+
+ public boolean getSimpleDateFormat() {
+ return this.simpleDateFormat;
+ }
+
+ public boolean getJsonArray() {
+ return this.jsonArray;
+ }
+
+ public boolean getReverseKey() {
+ return this.reverseKey;
+ }
+
+ public String getReverseKeyValue() {
+ return this.reverseKeyValue;
+ }
+
+ public boolean isCompressed() {
+ return compressed;
+ }
+
+ public void setCompressed(boolean compressed) {
+ this.compressed = compressed;
+ }
+
+ public long getWarcSize() {
+ return warcSize;
+ }
+
+ public void setWarcSize(long warcSize) {
+ this.warcSize = warcSize;
+ }
+
+ public String getOutputDir() {
+ return outputDir;
+ }
+
+ public void setOutputDir(String outputDir) {
+ this.outputDir = outputDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
new file mode 100644
index 0000000..b4fc0a7
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+//JDK imports
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+//Commons imports
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FilenameUtils;
+
+//Hadoop
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.crawl.LinkDbReader;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.DumpFileUtil;
+import org.apache.nutch.util.NutchConfiguration;
+//Tika imports
+import org.apache.tika.Tika;
+
+import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
+import com.fasterxml.jackson.dataformat.cbor.CBORGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ibm.icu.text.DateFormat;
+import com.ibm.icu.text.SimpleDateFormat;
+
+/**
+ * <p>
+ * The Common Crawl Data Dumper tool enables one to reverse generate the raw
+ * content from Nutch segment data directories into a common crawling data
+ * format, consumed by many applications. The data is then serialized as <a
+ * href="http://cbor.io">CBOR</a>
+ * </p>
+ * <p>
+ * Text content will be stored in a structured document format. Below is a
+ * schema for storage of data and metadata related to a crawling request, with
+ * the response body truncated for readability. This document must be encoded
+ * using CBOR and should be compressed with gzip after encoding. The timestamped
+ * URL key for these records' keys follows the same layout as the media file
+ * directory structure, with underscores in place of directory separators. </li>
+ * </p>
+ * <p>
+ * Thus, the timestamped url key for the record is provided below followed by an
+ * example record:
+ * <p/>
+ * <pre>
+ * {@code
+ * com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000
+ *
+ * {
+ * "url": "http:\/\/somepage.com\/22\/14560817",
+ * "timestamp": "1411623696000",
+ * "request": {
+ * "method": "GET",
+ * "client": {
+ * "hostname": "crawler01.local",
+ * "address": "74.347.129.200",
+ * "software": "Apache Nutch v1.10",
+ * "robots": "classic",
+ * "contact": {
+ * "name": "Nutch Admin",
+ * "email": "nutch.pro@nutchadmin.org"
+ * }
+ * },
+ * "headers": {
+ * "Accept": "text\/html,application\/xhtml+xml,application\/xml",
+ * "Accept-Encoding": "gzip,deflate,sdch",
+ * "Accept-Language": "en-US,en",
+ * "User-Agent": "Mozilla\/5.0",
+ * "...": "..."
+ * },
+ * "body": null
+ * },
+ * "response": {
+ * "status": "200",
+ * "server": {
+ * "hostname": "somepage.com",
+ * "address": "55.33.51.19",
+ * },
+ * "headers": {
+ * "Content-Encoding": "gzip",
+ * "Content-Type": "text\/html",
+ * "Date": "Thu, 25 Sep 2014 04:16:58 GMT",
+ * "Expires": "Thu, 25 Sep 2014 04:16:57 GMT",
+ * "Server": "nginx",
+ * "...": "..."
+ * },
+ * "body": "\r\n <!DOCTYPE html PUBLIC ... \r\n\r\n \r\n </body>\r\n </html>\r\n \r\n\r\n",
+ * },
+ * "key": "com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000",
+ * "imported": "1411623698000"
+ * }
+ * }
+ * </pre>
+ * <p/>
+ * <p>
+ * Upon successful completion the tool displays a very convenient JSON snippet
+ * detailing the mimetype classifications and the counts of documents which fall
+ * into those classifications. An example is as follows:
+ * </p>
+ * <p/>
+ * <pre>
+ * {@code
+ * INFO: File Types:
+ * TOTAL Stats: {
+ * {"mimeType":"application/xml","count":19"}
+ * {"mimeType":"image/png","count":47"}
+ * {"mimeType":"image/jpeg","count":141"}
+ * {"mimeType":"image/vnd.microsoft.icon","count":4"}
+ * {"mimeType":"text/plain","count":89"}
+ * {"mimeType":"video/quicktime","count":2"}
+ * {"mimeType":"image/gif","count":63"}
+ * {"mimeType":"application/xhtml+xml","count":1670"}
+ * {"mimeType":"application/octet-stream","count":40"}
+ * {"mimeType":"text/html","count":1863"}
+ * }
+ * }
+ * </pre>
+ */
+public class CommonCrawlDataDumper extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CommonCrawlDataDumper.class.getName());
+ private static final int MAX_INLINKS = 5000;
+
+ private CommonCrawlConfig config = null;
+
+ // Gzip initialization
+ private FileOutputStream fileOutput = null;
+ private BufferedOutputStream bufOutput = null;
+ private GzipCompressorOutputStream gzipOutput = null;
+ private TarArchiveOutputStream tarOutput = null;
+ private ArrayList<String> fileList = null;
+
+ /**
+ * Main method for invoking this tool
+ *
+ * @param args 1) output directory (which will be created if it does not
+ * already exist) to host the CBOR data and 2) a directory
+ * containing one or more segments from which we wish to generate
+ * CBOR data from. Optionally, 3) a list of mimetypes and the 4)
+ * the gzip option may be provided.
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+ Configuration conf = NutchConfiguration.create();
+ int res = ToolRunner.run(conf, new CommonCrawlDataDumper(), args);
+ System.exit(res);
+ }
+
+ /**
+ * Constructor
+ */
+ public CommonCrawlDataDumper(CommonCrawlConfig config) {
+ this.config = config;
+ }
+
+ public CommonCrawlDataDumper() {
+ }
+
+ /**
+ * Dumps the reverse engineered CBOR content from the provided segment
+ * directories if a parent directory contains more than one segment,
+ * otherwise a single segment can be passed as an argument. If the boolean
+ * argument is provided then the CBOR is also zipped.
+ *
+ * @param outputDir the directory you wish to dump the raw content to. This
+ * directory will be created.
+ * @param segmentRootDir a directory containing one or more segments.
+ * @param linkdb Path to linkdb.
+ * @param gzip a boolean flag indicating whether the CBOR content should also
+ * be gzipped.
+ * @param epochFilename if {@code true}, output files will be names using the epoch time (in milliseconds).
+ * @param extension a file extension to use with output documents.
+ * @throws Exception if any exception occurs.
+ */
+ public void dump(File outputDir, File segmentRootDir, File linkdb, boolean gzip,
+ String[] mimeTypes, boolean epochFilename, String extension, boolean warc)
+ throws Exception {
+ if (gzip) {
+ LOG.info("Gzipping CBOR data has been skipped");
+ }
+ // total file counts
+ Map<String, Integer> typeCounts = new HashMap<String, Integer>();
+ // filtered file counters
+ Map<String, Integer> filteredCounts = new HashMap<String, Integer>();
+
+ Configuration nutchConfig = NutchConfiguration.create();
+ final FileSystem fs = FileSystem.get(nutchConfig);
+ Path segmentRootPath = new Path(segmentRootDir.toString());
+
+ //get all paths
+ List<Path> parts = new ArrayList<>();
+ RemoteIterator<LocatedFileStatus> files = fs.listFiles(segmentRootPath, true);
+ String partPattern = ".*" + File.separator + Content.DIR_NAME
+ + File.separator + "part-[0-9]{5}" + File.separator + "data";
+ while (files.hasNext()) {
+ LocatedFileStatus next = files.next();
+ if (next.isFile()) {
+ Path path = next.getPath();
+ if (path.toString().matches(partPattern)){
+ parts.add(path);
+ }
+ }
+ }
+
+ LinkDbReader linkDbReader = null;
+ if (linkdb != null) {
+ linkDbReader = new LinkDbReader(fs.getConf(), new Path(linkdb.toString()));
+ }
+ if (parts == null || parts.size() == 0) {
+ LOG.error( "No segment directories found in {} ",
+ segmentRootDir.getAbsolutePath());
+ System.exit(1);
+ }
+ LOG.info("Found {} segment parts", parts.size());
+ if (gzip && !warc) {
+ fileList = new ArrayList<>();
+ constructNewStream(outputDir);
+ }
+
+ for (Path segmentPart : parts) {
+ LOG.info("Processing segment Part : [ {} ]", segmentPart);
+ try {
+ SequenceFile.Reader reader = new SequenceFile.Reader(nutchConfig,
+ SequenceFile.Reader.file(segmentPart));
+
+ Writable key = (Writable) reader.getKeyClass().newInstance();
+
+ Content content = null;
+ while (reader.next(key)) {
+ content = new Content();
+ reader.getCurrentValue(content);
+ Metadata metadata = content.getMetadata();
+ String url = key.toString();
+
+ String baseName = FilenameUtils.getBaseName(url);
+ String extensionName = FilenameUtils.getExtension(url);
+
+ if (!extension.isEmpty()) {
+ extensionName = extension;
+ } else if ((extensionName == null) || extensionName.isEmpty()) {
+ extensionName = "html";
+ }
+
+ String outputFullPath = null;
+ String outputRelativePath = null;
+ String filename = null;
+ String timestamp = null;
+ String reverseKey = null;
+
+ if (epochFilename || config.getReverseKey()) {
+ try {
+ long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
+ .parse(getDate(metadata.get("Date"))).getTime();
+ timestamp = String.valueOf(epoch);
+ } catch (ParseException pe) {
+ LOG.warn(pe.getMessage());
+ }
+
+ reverseKey = reverseUrl(url);
+ config.setReverseKeyValue(
+ reverseKey.replace("/", "_") + "_" + DigestUtils.sha1Hex(url)
+ + "_" + timestamp);
+ }
+
+ if (!warc) {
+ if (epochFilename) {
+ outputFullPath = DumpFileUtil
+ .createFileNameFromUrl(outputDir.getAbsolutePath(),
+ reverseKey, url, timestamp, extensionName, !gzip);
+ outputRelativePath = outputFullPath
+ .substring(0, outputFullPath.lastIndexOf(File.separator) - 1);
+ filename = content.getMetadata().get(Metadata.DATE) + "."
+ + extensionName;
+ } else {
+ String md5Ofurl = DumpFileUtil.getUrlMD5(url);
+ String fullDir = DumpFileUtil
+ .createTwoLevelsDirectory(outputDir.getAbsolutePath(),
+ md5Ofurl, !gzip);
+ filename = DumpFileUtil
+ .createFileName(md5Ofurl, baseName, extensionName);
+ outputFullPath = String.format("%s/%s", fullDir, filename);
+
+ String[] fullPathLevels = fullDir.split(File.separator);
+ String firstLevelDirName = fullPathLevels[fullPathLevels.length
+ - 2];
+ String secondLevelDirName = fullPathLevels[fullPathLevels.length
+ - 1];
+ outputRelativePath = firstLevelDirName + secondLevelDirName;
+ }
+ }
+ // Encode all filetypes if no mimetypes have been given
+ Boolean filter = (mimeTypes == null);
+
+ String jsonData = "";
+ try {
+ String mimeType = new Tika().detect(content.getContent());
+ // Maps file to JSON-based structure
+
+ Set<String> inUrls = null; //there may be duplicates, so using set
+ if (linkDbReader != null) {
+ Inlinks inlinks = linkDbReader.getInlinks((Text) key);
+ if (inlinks != null) {
+ Iterator<Inlink> iterator = inlinks.iterator();
+ inUrls = new LinkedHashSet<>();
+ while (inUrls.size() <= MAX_INLINKS && iterator.hasNext()){
+ inUrls.add(iterator.next().getFromUrl());
+ }
+ }
+ }
+ //TODO: Make this Jackson Format implementation reusable
+ try (CommonCrawlFormat format = CommonCrawlFormatFactory
+ .getCommonCrawlFormat(warc ? "WARC" : "JACKSON", nutchConfig, config)) {
+ if (inUrls != null) {
+ format.setInLinks(new ArrayList<>(inUrls));
+ }
+ jsonData = format.getJsonData(url, content, metadata);
+ }
+
+ collectStats(typeCounts, mimeType);
+ // collects statistics for the given mimetypes
+ if ((mimeType != null) && (mimeTypes != null) && Arrays
+ .asList(mimeTypes).contains(mimeType)) {
+ collectStats(filteredCounts, mimeType);
+ filter = true;
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fatal error in creating JSON data: " + ioe.getMessage());
+ return;
+ }
+
+ if (!warc) {
+ if (filter) {
+ byte[] byteData = serializeCBORData(jsonData);
+
+ if (!gzip) {
+ File outputFile = new File(outputFullPath);
+ if (outputFile.exists()) {
+ LOG.info("Skipping writing: [" + outputFullPath
+ + "]: file already exists");
+ } else {
+ LOG.info("Writing: [" + outputFullPath + "]");
+ IOUtils.copy(new ByteArrayInputStream(byteData),
+ new FileOutputStream(outputFile));
+ }
+ } else {
+ if (fileList.contains(outputFullPath)) {
+ LOG.info("Skipping compressing: [" + outputFullPath
+ + "]: file already exists");
+ } else {
+ fileList.add(outputFullPath);
+ LOG.info("Compressing: [" + outputFullPath + "]");
+ //TarArchiveEntry tarEntry = new TarArchiveEntry(firstLevelDirName + File.separator + secondLevelDirName + File.separator + filename);
+ TarArchiveEntry tarEntry = new TarArchiveEntry(
+ outputRelativePath + File.separator + filename);
+ tarEntry.setSize(byteData.length);
+ tarOutput.putArchiveEntry(tarEntry);
+ tarOutput.write(byteData);
+ tarOutput.closeArchiveEntry();
+ }
+ }
+ }
+ }
+ }
+ reader.close();
+ } catch (Exception e){
+ LOG.warn("SKIPPED: {} Because : {}", segmentPart, e.getMessage());
+ } finally {
+ fs.close();
+ }
+ }
+
+ if (gzip && !warc) {
+ closeStream();
+ }
+
+ if (!typeCounts.isEmpty()) {
+ LOG.info("CommonsCrawlDataDumper File Stats: " + DumpFileUtil
+ .displayFileTypes(typeCounts, filteredCounts));
+ }
+
+ }
+
+ private void closeStream() {
+ try {
+ tarOutput.finish();
+
+ tarOutput.close();
+ gzipOutput.close();
+ bufOutput.close();
+ fileOutput.close();
+ } catch (IOException ioe) {
+ LOG.warn("Error in closing stream: " + ioe.getMessage());
+ }
+ }
+
+ private void constructNewStream(File outputDir) throws IOException {
+ String archiveName = new SimpleDateFormat("yyyyMMddhhmm'.tar.gz'")
+ .format(new Date());
+ LOG.info("Creating a new gzip archive: " + archiveName);
+ fileOutput = new FileOutputStream(
+ new File(outputDir + File.separator + archiveName));
+ bufOutput = new BufferedOutputStream(fileOutput);
+ gzipOutput = new GzipCompressorOutputStream(bufOutput);
+ tarOutput = new TarArchiveOutputStream(gzipOutput);
+ tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+ }
+
+ /**
+ * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as 3-byte
+ * sequence of {@code 0xd9d9f7}) at the current position. This method must
+ * be used to write the CBOR magic number at the beginning of the document.
+ * Since version 2.5, <a
+ * href="https://github.com/FasterXML/jackson-dataformat-cbor"
+ * >jackson-dataformat-cbor</a> will support the {@code WRITE_TYPE_HEADER}
+ * feature to write that type tag at the beginning of the document.
+ *
+ * @param generator {@link CBORGenerator} object used to create a CBOR-encoded document.
+ * @throws IOException if any I/O error occurs.
+ * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5">RFC
+ * 7049</a>
+ */
+ private void writeMagicHeader(CBORGenerator generator) throws IOException {
+ // Writes self-describe CBOR
+ // https://tools.ietf.org/html/rfc7049#section-2.4.5
+ // It will be supported in jackson-cbor since 2.5
+ byte[] header = new byte[3];
+ header[0] = (byte) 0xd9;
+ header[1] = (byte) 0xd9;
+ header[2] = (byte) 0xf7;
+ generator.writeBytes(header, 0, header.length);
+ }
+
+ private byte[] serializeCBORData(String jsonData) {
+ CBORFactory factory = new CBORFactory();
+
+ CBORGenerator generator = null;
+ ByteArrayOutputStream stream = null;
+
+ try {
+ stream = new ByteArrayOutputStream();
+ generator = factory.createGenerator(stream);
+ // Writes CBOR tag
+ writeMagicHeader(generator);
+ generator.writeString(jsonData);
+ generator.flush();
+ stream.flush();
+
+ return stream.toByteArray();
+
+ } catch (Exception e) {
+ LOG.warn("CBOR encoding failed: " + e.getMessage());
+ } finally {
+ try {
+ generator.close();
+ stream.close();
+ } catch (IOException e) {
+ // nothing to do
+ }
+ }
+
+ return null;
+ }
+
+ private void collectStats(Map<String, Integer> typeCounts, String mimeType) {
+ typeCounts.put(mimeType,
+ typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1);
+ }
+
+ /**
+ * Gets the current date if the given timestamp is empty or null.
+ *
+ * @param timestamp the timestamp
+ * @return the current timestamp if the given one is null.
+ */
+ private String getDate(String timestamp) {
+ if (timestamp == null || timestamp.isEmpty()) {
+ DateFormat dateFormat = new SimpleDateFormat(
+ "EEE, d MMM yyyy HH:mm:ss z");
+ timestamp = dateFormat.format(new Date());
+ }
+ return timestamp;
+
+ }
+
+ public static String reverseUrl(String urlString) {
+ URL url;
+ String reverseKey = null;
+ try {
+ url = new URL(urlString);
+
+ String[] hostPart = url.getHost().replace('.', '/').split("/");
+
+ StringBuilder sb = new StringBuilder();
+ sb.append(hostPart[hostPart.length - 1]);
+ for (int i = hostPart.length - 2; i >= 0; i--) {
+ sb.append("/" + hostPart[i]);
+ }
+
+ reverseKey = sb.toString();
+
+ } catch (MalformedURLException e) {
+ LOG.error("Failed to parse URL: {}", urlString);
+ }
+
+ return reverseKey;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Option helpOpt = new Option("h", "help", false, "show this help message.");
+ // argument options
+ @SuppressWarnings("static-access")
+ Option outputOpt = OptionBuilder.withArgName("outputDir").hasArg()
+ .withDescription(
+ "output directory (which will be created) to host the CBOR data.")
+ .create("outputDir");
+ // WARC format
+ Option warcOpt = new Option("warc", "export to a WARC file");
+
+ @SuppressWarnings("static-access")
+ Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+ .withDescription("the segment or directory containing segments to use").create("segment");
+ // create mimetype and gzip options
+ @SuppressWarnings("static-access")
+ Option mimeOpt = OptionBuilder.isRequired(false).withArgName("mimetype")
+ .hasArgs().withDescription(
+ "an optional list of mimetypes to dump, excluding all others. Defaults to all.")
+ .create("mimetype");
+ @SuppressWarnings("static-access")
+ Option gzipOpt = OptionBuilder.withArgName("gzip").hasArg(false)
+ .withDescription(
+ "an optional flag indicating whether to additionally gzip the data.")
+ .create("gzip");
+ @SuppressWarnings("static-access")
+ Option keyPrefixOpt = OptionBuilder.withArgName("keyPrefix").hasArg(true)
+ .withDescription("an optional prefix for key in the output format.")
+ .create("keyPrefix");
+ @SuppressWarnings("static-access")
+ Option simpleDateFormatOpt = OptionBuilder.withArgName("SimpleDateFormat")
+ .hasArg(false).withDescription(
+ "an optional format for timestamp in GMT epoch milliseconds.")
+ .create("SimpleDateFormat");
+ @SuppressWarnings("static-access")
+ Option epochFilenameOpt = OptionBuilder.withArgName("epochFilename")
+ .hasArg(false)
+ .withDescription("an optional format for output filename.")
+ .create("epochFilename");
+ @SuppressWarnings("static-access")
+ Option jsonArrayOpt = OptionBuilder.withArgName("jsonArray").hasArg(false)
+ .withDescription("an optional format for JSON output.")
+ .create("jsonArray");
+ @SuppressWarnings("static-access")
+ Option reverseKeyOpt = OptionBuilder.withArgName("reverseKey").hasArg(false)
+ .withDescription("an optional format for key value in JSON output.")
+ .create("reverseKey");
+ @SuppressWarnings("static-access")
+ Option extensionOpt = OptionBuilder.withArgName("extension").hasArg(true)
+ .withDescription("an optional file extension for output documents.")
+ .create("extension");
+ @SuppressWarnings("static-access")
+ Option sizeOpt = OptionBuilder.withArgName("warcSize").hasArg(true)
+ .withType(Number.class)
+ .withDescription("an optional file size in bytes for the WARC file(s)")
+ .create("warcSize");
+ @SuppressWarnings("static-access")
+ Option linkDbOpt = OptionBuilder.withArgName("linkdb").hasArg(true)
+ .withDescription("an optional linkdb parameter to include inlinks in dump files")
+ .isRequired(false)
+ .create("linkdb");
+
+ // create the options
+ Options options = new Options();
+ options.addOption(helpOpt);
+ options.addOption(outputOpt);
+ options.addOption(segOpt);
+ // create mimetypes and gzip options
+ options.addOption(warcOpt);
+ options.addOption(mimeOpt);
+ options.addOption(gzipOpt);
+ // create keyPrefix option
+ options.addOption(keyPrefixOpt);
+ // create simpleDataFormat option
+ options.addOption(simpleDateFormatOpt);
+ options.addOption(epochFilenameOpt);
+ options.addOption(jsonArrayOpt);
+ options.addOption(reverseKeyOpt);
+ options.addOption(extensionOpt);
+ options.addOption(sizeOpt);
+ options.addOption(linkDbOpt);
+
+ CommandLineParser parser = new GnuParser();
+ try {
+ CommandLine line = parser.parse(options, args);
+ if (line.hasOption("help") || !line.hasOption("outputDir") || (!line
+ .hasOption("segment"))) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter
+ .printHelp(CommonCrawlDataDumper.class.getName(), options, true);
+ return 0;
+ }
+
+ File outputDir = new File(line.getOptionValue("outputDir"));
+ File segmentRootDir = new File(line.getOptionValue("segment"));
+ String[] mimeTypes = line.getOptionValues("mimetype");
+ boolean gzip = line.hasOption("gzip");
+ boolean epochFilename = line.hasOption("epochFilename");
+
+ String keyPrefix = line.getOptionValue("keyPrefix", "");
+ boolean simpleDateFormat = line.hasOption("SimpleDateFormat");
+ boolean jsonArray = line.hasOption("jsonArray");
+ boolean reverseKey = line.hasOption("reverseKey");
+ String extension = line.getOptionValue("extension", "");
+ boolean warc = line.hasOption("warc");
+ long warcSize = 0;
+
+ if (line.getParsedOptionValue("warcSize") != null) {
+ warcSize = (Long) line.getParsedOptionValue("warcSize");
+ }
+ String linkdbPath = line.getOptionValue("linkdb");
+ File linkdb = linkdbPath == null ? null : new File(linkdbPath);
+
+ CommonCrawlConfig config = new CommonCrawlConfig();
+ config.setKeyPrefix(keyPrefix);
+ config.setSimpleDateFormat(simpleDateFormat);
+ config.setJsonArray(jsonArray);
+ config.setReverseKey(reverseKey);
+ config.setCompressed(gzip);
+ config.setWarcSize(warcSize);
+ config.setOutputDir(line.getOptionValue("outputDir"));
+
+ if (!outputDir.exists()) {
+ LOG.warn("Output directory: [" + outputDir.getAbsolutePath()
+ + "]: does not exist, creating it.");
+ if (!outputDir.mkdirs())
+ throw new Exception(
+ "Unable to create: [" + outputDir.getAbsolutePath() + "]");
+ }
+
+ CommonCrawlDataDumper dumper = new CommonCrawlDataDumper(config);
+
+ dumper.dump(outputDir, segmentRootDir, linkdb, gzip, mimeTypes, epochFilename,
+ extension, warc);
+
+ } catch (Exception e) {
+ LOG.error(CommonCrawlDataDumper.class.getName() + ": " + StringUtils
+ .stringifyException(e));
+ e.printStackTrace();
+ return -1;
+ }
+
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java
new file mode 100644
index 0000000..0834d95
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for all CommonCrawl formatter. It provides the signature for the
+ * method used to get JSON data.
+ *
+ * @author gtotaro
+ *
+ */
+public interface CommonCrawlFormat extends Closeable {
+
+ /**
+ *
+ * @param mapAll If {@code true} maps all metdata on the JSON structure.
+ * @return the JSON data
+ */
+ //public String getJsonData(boolean mapAll) throws IOException;
+ public String getJsonData() throws IOException;
+
+ /**
+ * Returns a string representation of the JSON structure of the URL content
+ *
+ * @param url
+ * @param content
+ * @param metadata
+ * @return
+ */
+ public String getJsonData(String url, Content content, Metadata metadata)
+ throws IOException;
+
+ /**
+ * Returns a string representation of the JSON structure of the URL content
+ * takes into account the parsed metadata about the URL
+ *
+ * @param url
+ * @param content
+ * @param metadata
+ * @return
+ */
+ public String getJsonData(String url, Content content, Metadata metadata,
+ ParseData parseData) throws IOException;
+
+
+ /**
+ * sets inlinks of this document
+ * @param inLinks list of inlinks
+ */
+ void setInLinks(List<String> inLinks);
+
+
+ /**
+ * gets set of inlinks
+ * @return gets inlinks of this document
+ */
+ List<String> getInLinks();
+
+ /**
+ * Optional method that could be implemented if the actual format needs some
+ * close procedure.
+ */
+ public abstract void close();
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
new file mode 100644
index 0000000..8814168
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Factory class that creates new {@see CommonCrawlFormat} objects (a.k.a. formatter) that map crawled files to CommonCrawl format.
+ *
+ */
+public class CommonCrawlFormatFactory {
+
+ /**
+ * Returns a new instance of a {@see CommonCrawlFormat} object specifying the type of formatter.
+ * @param formatType the type of formatter to be created.
+ * @param url the url.
+ * @param content the content.
+ * @param metadata the metadata.
+ * @param nutchConf the configuration.
+ * @param config the CommonCrawl output configuration.
+ * @return the new {@see CommonCrawlFormat} object.
+ * @throws IOException If any I/O error occurs.
+ * @deprecated
+ */
+ public static CommonCrawlFormat getCommonCrawlFormat(String formatType, String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ if (formatType == null) {
+ return null;
+ }
+
+ if (formatType.equalsIgnoreCase("jackson")) {
+ return new CommonCrawlFormatJackson(url, content, metadata, nutchConf, config);
+ }
+ else if (formatType.equalsIgnoreCase("jettinson")) {
+ return new CommonCrawlFormatJettinson(url, content, metadata, nutchConf, config);
+ }
+ else if (formatType.equalsIgnoreCase("simple")) {
+ return new CommonCrawlFormatSimple(url, content, metadata, nutchConf, config);
+ }
+
+ return null;
+ }
+
+ // The format should not depend on variable attributes, essentially this
+ // should be one for the full job
+ public static CommonCrawlFormat getCommonCrawlFormat(String formatType, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ if (formatType.equalsIgnoreCase("WARC")) {
+ return new CommonCrawlFormatWARC(nutchConf, config);
+ }
+
+ if (formatType.equalsIgnoreCase("JACKSON")) {
+ return new CommonCrawlFormatJackson( nutchConf, config);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
new file mode 100644
index 0000000..0d6cae2
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * This class provides methods to map crawled data on JSON using Jackson Streaming APIs.
+ *
+ */
+public class CommonCrawlFormatJackson extends AbstractCommonCrawlFormat {
+
+ private ByteArrayOutputStream out;
+
+ private JsonGenerator generator;
+
+ public CommonCrawlFormatJackson(Configuration nutchConf,
+ CommonCrawlConfig config) throws IOException {
+ super(null, null, null, nutchConf, config);
+
+ JsonFactory factory = new JsonFactory();
+ this.out = new ByteArrayOutputStream();
+ this.generator = factory.createGenerator(out);
+
+ this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT
+ }
+
+ public CommonCrawlFormatJackson(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ super(url, content, metadata, nutchConf, config);
+
+ JsonFactory factory = new JsonFactory();
+ this.out = new ByteArrayOutputStream();
+ this.generator = factory.createGenerator(out);
+
+ this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT
+ }
+
+ @Override
+ protected void writeKeyValue(String key, String value) throws IOException {
+ generator.writeFieldName(key);
+ generator.writeString(value);
+ }
+
+ @Override
+ protected void writeKeyNull(String key) throws IOException {
+ generator.writeFieldName(key);
+ generator.writeNull();
+ }
+
+ @Override
+ protected void startArray(String key, boolean nested, boolean newline) throws IOException {
+ if (key != null) {
+ generator.writeFieldName(key);
+ }
+ generator.writeStartArray();
+ }
+
+ @Override
+ protected void closeArray(String key, boolean nested, boolean newline) throws IOException {
+ generator.writeEndArray();
+ }
+
+ @Override
+ protected void writeArrayValue(String value) throws IOException {
+ generator.writeString(value);
+ }
+
+ @Override
+ protected void startObject(String key) throws IOException {
+ if (key != null) {
+ generator.writeFieldName(key);
+ }
+ generator.writeStartObject();
+ }
+
+ @Override
+ protected void closeObject(String key) throws IOException {
+ generator.writeEndObject();
+ }
+
+ @Override
+ protected String generateJson() throws IOException {
+ this.generator.flush();
+ return this.out.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
new file mode 100644
index 0000000..6950e2a
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * This class provides methods to map crawled data on JSON using Jettinson APIs.
+ *
+ */
+public class CommonCrawlFormatJettinson extends AbstractCommonCrawlFormat {
+
+ private Deque<JSONObject> stackObjects;
+
+ private Deque<JSONArray> stackArrays;
+
+ public CommonCrawlFormatJettinson(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ super(url, content, metadata, nutchConf, config);
+
+ stackObjects = new ArrayDeque<JSONObject>();
+ stackArrays = new ArrayDeque<JSONArray>();
+ }
+
+ @Override
+ protected void writeKeyValue(String key, String value) throws IOException {
+ try {
+ stackObjects.getFirst().put(key, value);
+ } catch (JSONException jsone) {
+ throw new IOException(jsone.getMessage());
+ }
+ }
+
+ @Override
+ protected void writeKeyNull(String key) throws IOException {
+ try {
+ stackObjects.getFirst().put(key, JSONObject.NULL);
+ } catch (JSONException jsone) {
+ throw new IOException(jsone.getMessage());
+ }
+ }
+
+ @Override
+ protected void startArray(String key, boolean nested, boolean newline) throws IOException {
+ JSONArray array = new JSONArray();
+ stackArrays.push(array);
+ }
+
+ @Override
+ protected void closeArray(String key, boolean nested, boolean newline) throws IOException {
+ try {
+ if (stackArrays.size() > 1) {
+ JSONArray array = stackArrays.pop();
+ if (nested) {
+ stackArrays.getFirst().put(array);
+ }
+ else {
+ stackObjects.getFirst().put(key, array);
+ }
+ }
+ } catch (JSONException jsone) {
+ throw new IOException(jsone.getMessage());
+ }
+ }
+
+ @Override
+ protected void writeArrayValue(String value) throws IOException {
+ if (stackArrays.size() > 1) {
+ stackArrays.getFirst().put(value);
+ }
+ }
+
+ @Override
+ protected void startObject(String key) throws IOException {
+ JSONObject object = new JSONObject();
+ stackObjects.push(object);
+ }
+
+ @Override
+ protected void closeObject(String key) throws IOException {
+ try {
+ if (stackObjects.size() > 1) {
+ JSONObject object = stackObjects.pop();
+ stackObjects.getFirst().put(key, object);
+ }
+ } catch (JSONException jsone) {
+ throw new IOException(jsone.getMessage());
+ }
+ }
+
+ @Override
+ protected String generateJson() throws IOException {
+ try {
+ return stackObjects.getFirst().toString(2);
+ } catch (JSONException jsone) {
+ throw new IOException(jsone.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
new file mode 100644
index 0000000..a1aaa44
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * This class provides methods to map crawled data on JSON using a {@see StringBuilder} object.
+ *
+ */
+public class CommonCrawlFormatSimple extends AbstractCommonCrawlFormat {
+
+ private StringBuilder sb;
+
+ private int tabCount;
+
+ public CommonCrawlFormatSimple(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+ super(url, content, metadata, nutchConf, config);
+
+ this.sb = new StringBuilder();
+ this.tabCount = 0;
+ }
+
+ @Override
+ protected void writeKeyValue(String key, String value) throws IOException {
+ sb.append(printTabs() + "\"" + key + "\": " + quote(value) + ",\n");
+ }
+
+ @Override
+ protected void writeKeyNull(String key) throws IOException {
+ sb.append(printTabs() + "\"" + key + "\": null,\n");
+ }
+
+ @Override
+ protected void startArray(String key, boolean nested, boolean newline) throws IOException {
+ String name = (key != null) ? "\"" + key + "\": " : "";
+ String nl = (newline) ? "\n" : "";
+ sb.append(printTabs() + name + "[" + nl);
+ if (newline) {
+ this.tabCount++;
+ }
+ }
+
+ @Override
+ protected void closeArray(String key, boolean nested, boolean newline) throws IOException {
+ if (sb.charAt(sb.length()-1) == ',') {
+ sb.deleteCharAt(sb.length()-1); // delete comma
+ }
+ else if (sb.charAt(sb.length()-2) == ',') {
+ sb.deleteCharAt(sb.length()-2); // delete comma
+ }
+ String nl = (newline) ? printTabs() : "";
+ if (newline) {
+ this.tabCount++;
+ }
+ sb.append(nl + "],\n");
+ }
+
+ @Override
+ protected void writeArrayValue(String value) {
+ sb.append("\"" + value + "\",");
+ }
+
+ protected void startObject(String key) throws IOException {
+ String name = "";
+ if (key != null) {
+ name = "\"" + key + "\": ";
+ }
+ sb.append(printTabs() + name + "{\n");
+ this.tabCount++;
+ }
+
+ protected void closeObject(String key) throws IOException {
+ if (sb.charAt(sb.length()-2) == ',') {
+ sb.deleteCharAt(sb.length()-2); // delete comma
+ }
+ this.tabCount--;
+ sb.append(printTabs() + "},\n");
+ }
+
+ protected String generateJson() throws IOException {
+ sb.deleteCharAt(sb.length()-1); // delete new line
+ sb.deleteCharAt(sb.length()-1); // delete comma
+ return sb.toString();
+ }
+
+ private String printTabs() {
+ StringBuilder sb = new StringBuilder();
+ for (int i=0; i < this.tabCount ;i++) {
+ sb.append("\t");
+ }
+ return sb.toString();
+ }
+
+ private static String quote(String string) throws IOException {
+ StringBuilder sb = new StringBuilder();
+
+ if (string == null || string.length() == 0) {
+ sb.append("\"\"");
+ return sb.toString();
+ }
+
+ char b;
+ char c = 0;
+ String hhhh;
+ int i;
+ int len = string.length();
+
+ sb.append('"');
+ for (i = 0; i < len; i += 1) {
+ b = c;
+ c = string.charAt(i);
+ switch (c) {
+ case '\\':
+ case '"':
+ sb.append('\\');
+ sb.append(c);
+ break;
+ case '/':
+ if (b == '<') {
+ sb.append('\\');
+ }
+ sb.append(c);
+ break;
+ case '\b':
+ sb.append("\\b");
+ break;
+ case '\t':
+ sb.append("\\t");
+ break;
+ case '\n':
+ sb.append("\\n");
+ break;
+ case '\f':
+ sb.append("\\f");
+ break;
+ case '\r':
+ sb.append("\\r");
+ break;
+ default:
+ if (c < ' ' || (c >= '\u0080' && c < '\u00a0')
+ || (c >= '\u2000' && c < '\u2100')) {
+ sb.append("\\u");
+ hhhh = Integer.toHexString(c);
+ sb.append("0000", 0, 4 - hhhh.length());
+ sb.append(hhhh);
+ } else {
+ sb.append(c);
+ }
+ }
+ }
+ sb.append('"');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
new file mode 100644
index 0000000..191e42e
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
@@ -0,0 +1,286 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.ParseException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.ibm.icu.text.SimpleDateFormat;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.WriterPoolMember;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.io.warc.WARCWriter;
+import org.archive.io.warc.WARCWriterPoolSettingsData;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class CommonCrawlFormatWARC extends AbstractCommonCrawlFormat {
+
+ public static final String MAX_WARC_FILE_SIZE = "warc.file.size.max";
+ public static final String TEMPLATE = "${prefix}-${timestamp17}-${serialno}";
+
+ private static final AtomicInteger SERIALNO = new AtomicInteger();
+ private final static UUIDGenerator GENERATOR = new UUIDGenerator();
+
+ private String outputDir = null;
+ private ByteArrayOutputStream out;
+ private WARCWriter writer;
+ private ParseData parseData;
+
+ public CommonCrawlFormatWARC(Configuration nutchConf,
+ CommonCrawlConfig config) throws IOException {
+ super(null, null, null, nutchConf, config);
+
+ this.out = new ByteArrayOutputStream();
+
+ ANVLRecord info = WARCUtils.getWARCInfoContent(nutchConf);
+ List<String> md = Collections.singletonList(info.toString());
+
+ this.outputDir = config.getOutputDir();
+
+ if (null == outputDir) {
+ String message = "Missing output directory configuration: " + outputDir;
+
+ throw new RuntimeException(message);
+ }
+
+ File file = new File(outputDir);
+
+ long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+ if (config.getWarcSize() > 0) {
+ maxSize = config.getWarcSize();
+ }
+
+ WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+ WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+ config.isCompressed(), Arrays.asList(new File[] { file }), md,
+ new UUIDGenerator());
+
+ writer = new WARCWriter(SERIALNO, settings);
+ }
+
+ public CommonCrawlFormatWARC(String url, Content content, Metadata metadata,
+ Configuration nutchConf, CommonCrawlConfig config, ParseData parseData)
+ throws IOException {
+ super(url, content, metadata, nutchConf, config);
+
+ this.out = new ByteArrayOutputStream();
+ this.parseData = parseData;
+
+ ANVLRecord info = WARCUtils.getWARCInfoContent(conf);
+ List<String> md = Collections.singletonList(info.toString());
+
+ this.outputDir = config.getOutputDir();
+
+ if (null == outputDir) {
+ String message = "Missing output directory configuration: " + outputDir;
+
+ throw new RuntimeException(message);
+ }
+
+ File file = new File(outputDir);
+
+ long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+ if (config.getWarcSize() > 0) {
+ maxSize = config.getWarcSize();
+ }
+
+ WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+ WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+ config.isCompressed(), Arrays.asList(new File[] { file }), md,
+ new UUIDGenerator());
+
+ writer = new WARCWriter(SERIALNO, settings);
+ }
+
+ public String getJsonData(String url, Content content, Metadata metadata,
+ ParseData parseData) throws IOException {
+ this.url = url;
+ this.content = content;
+ this.metadata = metadata;
+ this.parseData = parseData;
+
+ return this.getJsonData();
+ }
+
+ @Override
+ public String getJsonData() throws IOException {
+
+ long position = writer.getPosition();
+
+ try {
+ // See if we need to open a new file because we've exceeded maxBytes
+
+ // checkSize will open a new file if we exceeded the maxBytes setting
+ writer.checkSize();
+
+ if (writer.getPosition() != position) {
+ // We just closed the file because it was larger than maxBytes.
+ position = writer.getPosition();
+ }
+
+ // response record
+ URI id = writeResponse();
+
+ if (StringUtils.isNotBlank(metadata.get("_request_"))) {
+ // write the request method if any request info is found
+ writeRequest(id);
+ }
+ } catch (IOException e) {
+ // Launch the corresponding IO error
+ throw e;
+ } catch (ParseException e) {
+ // do nothing, as we can't establish a valid WARC-Date for this record
+ // lets skip it altogether
+ LOG.error("Can't get a valid date from: {}", url);
+ }
+
+ return null;
+ }
+
+ protected URI writeResponse() throws IOException, ParseException {
+ WARCRecordInfo record = new WARCRecordInfo();
+
+ record.setType(WARCConstants.WARCRecordType.response);
+ record.setUrl(getUrl());
+
+ String fetchTime;
+
+ record.setCreate14DigitDate(DateUtils
+ .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+ record.setMimetype(WARCConstants.HTTP_RESPONSE_MIMETYPE);
+ record.setRecordId(GENERATOR.getRecordID());
+
+ String IP = getResponseAddress();
+
+ if (StringUtils.isNotBlank(IP))
+ record.addExtraHeader(WARCConstants.HEADER_KEY_IP, IP);
+
+ if (ParseSegment.isTruncated(content))
+ record.addExtraHeader(WARCConstants.HEADER_KEY_TRUNCATED, "unspecified");
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ String httpHeaders = metadata.get("_response.headers_");
+
+ if (StringUtils.isNotBlank(httpHeaders)) {
+ output.write(httpHeaders.getBytes());
+ } else {
+ // change the record type to resource as we not have information about
+ // the headers
+ record.setType(WARCConstants.WARCRecordType.resource);
+ record.setMimetype(content.getContentType());
+ }
+
+ output.write(getResponseContent().getBytes());
+
+ record.setContentLength(output.size());
+ record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+ if (output.size() > 0) {
+ // avoid generating a 0 sized record, as the webarchive library will
+ // complain about it
+ writer.writeRecord(record);
+ }
+
+ return record.getRecordId();
+ }
+
+ protected URI writeRequest(URI id) throws IOException, ParseException {
+ WARCRecordInfo record = new WARCRecordInfo();
+
+ record.setType(WARCConstants.WARCRecordType.request);
+ record.setUrl(getUrl());
+ record.setCreate14DigitDate(DateUtils
+ .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+ record.setMimetype(WARCConstants.HTTP_REQUEST_MIMETYPE);
+ record.setRecordId(GENERATOR.getRecordID());
+
+ if (id != null) {
+ ANVLRecord headers = new ANVLRecord();
+ headers.addLabelValue(WARCConstants.HEADER_KEY_CONCURRENT_TO,
+ '<' + id.toString() + '>');
+ record.setExtraHeaders(headers);
+ }
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+ output.write(metadata.get("_request_").getBytes());
+ record.setContentLength(output.size());
+ record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+ writer.writeRecord(record);
+
+ return record.getRecordId();
+ }
+
+ @Override
+ protected String generateJson() throws IOException {
+ return null;
+ }
+
+ @Override
+ protected void writeKeyValue(String key, String value) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void writeKeyNull(String key) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void startArray(String key, boolean nested, boolean newline)
+ throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void closeArray(String key, boolean nested, boolean newline)
+ throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void writeArrayValue(String value) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void startObject(String key) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ protected void closeObject(String key) throws IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void close() {
+ if (writer != null)
+ try {
+ writer.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}