You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:52 UTC

[36/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
new file mode 100644
index 0000000..1b425c4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/AbstractCommonCrawlFormat.java
@@ -0,0 +1,393 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.URLEncoder;
+import java.net.UnknownHostException;
+import java.text.ParseException;
+import java.util.List;
+
+import org.apache.commons.httpclient.URIException;
+import org.apache.commons.httpclient.util.URIUtil;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ibm.icu.text.SimpleDateFormat;
+
+/**
+ * Abstract class that implements {@see CommonCrawlFormat} interface. 
+ *
+ */
+public abstract class AbstractCommonCrawlFormat implements CommonCrawlFormat {
+  protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommonCrawlFormat.class.getName());
+
+  protected String url;
+
+  protected Content content;
+
+  protected Metadata metadata;
+
+  protected Configuration conf;
+
+  protected String keyPrefix;
+
+  protected boolean simpleDateFormat;
+
+  protected boolean jsonArray;
+
+  protected boolean reverseKey;
+
+  protected String reverseKeyValue;
+
+  protected List<String> inLinks;
+
+  public AbstractCommonCrawlFormat(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+    this.url = url;
+    this.content = content;
+    this.metadata = metadata;
+    this.conf = nutchConf;
+
+    this.keyPrefix = config.getKeyPrefix();
+    this.simpleDateFormat = config.getSimpleDateFormat();
+    this.jsonArray = config.getJsonArray();
+    this.reverseKey = config.getReverseKey();
+    this.reverseKeyValue = config.getReverseKeyValue();
+  }
+
+  public String getJsonData(String url, Content content, Metadata metadata)
+      throws IOException {
+    this.url = url;
+    this.content = content;
+    this.metadata = metadata;
+
+    return this.getJsonData();
+  }
+
+  public String getJsonData(String url, Content content, Metadata metadata,
+      ParseData parseData) throws IOException {
+
+    // override of this is required in the actual formats
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public String getJsonData() throws IOException {
+    try {
+      startObject(null);
+
+      // url
+      writeKeyValue("url", getUrl());
+
+      // timestamp
+      writeKeyValue("timestamp", getTimestamp());
+
+      // request
+      startObject("request");
+      writeKeyValue("method", getMethod());
+      startObject("client");
+      writeKeyValue("hostname", getRequestHostName());
+      writeKeyValue("address", getRequestHostAddress());
+      writeKeyValue("software", getRequestSoftware());
+      writeKeyValue("robots", getRequestRobots());
+      startObject("contact");
+      writeKeyValue("name", getRequestContactName());
+      writeKeyValue("email", getRequestContactEmail());
+      closeObject("contact");
+      closeObject("client");
+      // start request headers
+      startHeaders("headers", false, true);
+      writeKeyValueWrapper("Accept", getRequestAccept());
+      writeKeyValueWrapper("Accept-Encoding", getRequestAcceptEncoding());
+      writeKeyValueWrapper("Accept-Language", getRequestAcceptLanguage());
+      writeKeyValueWrapper("User-Agent", getRequestUserAgent());
+      //closeObject("headers");
+      closeHeaders("headers", false, true);
+      writeKeyNull("body");
+      closeObject("request");
+
+      // response
+      startObject("response");
+      writeKeyValue("status", getResponseStatus());
+      startObject("server");
+      writeKeyValue("hostname", getResponseHostName());
+      writeKeyValue("address", getResponseAddress());
+      closeObject("server");
+      // start response headers
+      startHeaders("headers", false, true);
+      writeKeyValueWrapper("Content-Encoding", getResponseContentEncoding());
+      writeKeyValueWrapper("Content-Type", getResponseContentType());
+      writeKeyValueWrapper("Date", getResponseDate());
+      writeKeyValueWrapper("Server", getResponseServer());
+      for (String name : metadata.names()) {
+        if (name.equalsIgnoreCase("Content-Encoding") || name.equalsIgnoreCase("Content-Type") || name.equalsIgnoreCase("Date") || name.equalsIgnoreCase("Server")) {
+          continue;
+        }
+        writeKeyValueWrapper(name, metadata.get(name));
+      }
+      closeHeaders("headers", false, true);
+      writeKeyValue("body", getResponseContent());
+      closeObject("response");
+
+      // key
+      if (!this.keyPrefix.isEmpty()) {
+        this.keyPrefix += "-";
+      }
+      writeKeyValue("key", this.keyPrefix + getKey());
+
+      // imported
+      writeKeyValue("imported", getImported());
+
+      if (getInLinks() != null){
+        startArray("inlinks", false, true);
+        for (String link : getInLinks()) {
+          writeArrayValue(link);
+        }
+        closeArray("inlinks", false, true);
+      }
+      closeObject(null);
+
+      return generateJson();
+
+    } catch (IOException ioe) {
+      LOG.warn("Error in processing file " + url + ": " + ioe.getMessage());
+      throw new IOException("Error in generating JSON:" + ioe.getMessage());
+    }
+  }
+
+  // abstract methods
+
+  protected abstract void writeKeyValue(String key, String value) throws IOException;
+
+  protected abstract void writeKeyNull(String key) throws IOException;
+
+  protected abstract void startArray(String key, boolean nested, boolean newline) throws IOException;
+
+  protected abstract void closeArray(String key, boolean nested, boolean newline) throws IOException;
+
+  protected abstract void writeArrayValue(String value) throws IOException;
+
+  protected abstract void startObject(String key) throws IOException;
+
+  protected abstract void closeObject(String key) throws IOException;
+
+  protected abstract String generateJson() throws IOException;
+
+  // getters
+
+  protected String getUrl() {
+    try {
+      return URIUtil.encodePath(url);
+    } catch (URIException e) {
+      LOG.error("Can't encode URL " + url);
+    }
+
+    return url;
+  }
+
+  protected String getTimestamp() {
+    if (this.simpleDateFormat) {
+      String timestamp = null;
+      try {
+        long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get(Metadata.LAST_MODIFIED))).getTime();
+        timestamp = String.valueOf(epoch);
+      } catch (ParseException pe) {
+        LOG.warn(pe.getMessage());
+      }
+      return timestamp;
+    } else {
+      return ifNullString(metadata.get(Metadata.LAST_MODIFIED));
+    }
+  }
+
+  protected String getMethod() {
+    return new String("GET");
+  }
+
+  protected String getRequestHostName() {
+    String hostName = "";
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException uhe) {
+
+    }
+    return hostName;
+  }
+
+  protected String getRequestHostAddress() {
+    String hostAddress = "";
+    try {
+      hostAddress = InetAddress.getLocalHost().getHostAddress();
+    } catch (UnknownHostException uhe) {
+
+    }
+    return hostAddress;
+  }
+
+  protected String getRequestSoftware() {
+    return conf.get("http.agent.version", "");
+  }
+
+  protected String getRequestRobots() {
+    return new String("CLASSIC");
+  }
+
+  protected String getRequestContactName() {
+    return conf.get("http.agent.name", "");
+  }
+
+  protected String getRequestContactEmail() {
+    return conf.get("http.agent.email", "");
+  }
+
+  protected String getRequestAccept() {
+    return conf.get("http.accept", "");
+  }
+
+  protected String getRequestAcceptEncoding() {
+    return new String(""); // TODO
+  }
+
+  protected String getRequestAcceptLanguage() {
+    return conf.get("http.accept.language", "");
+  }
+
+  protected String getRequestUserAgent() {
+    return conf.get("http.robots.agents", "");
+  }
+
+  protected String getResponseStatus() {
+    return ifNullString(metadata.get("status"));
+  }
+
+  protected String getResponseHostName() {
+    return URLUtil.getHost(url);
+  }
+
+  protected String getResponseAddress() {
+    return ifNullString(metadata.get("_ip_"));
+  }
+
+  protected String getResponseContentEncoding() {
+    return ifNullString(metadata.get("Content-Encoding"));
+  }
+
+  protected String getResponseContentType() {
+    return ifNullString(metadata.get("Content-Type"));
+  }
+
+  public List<String> getInLinks() {
+    return inLinks;
+  }
+
+  public void setInLinks(List<String> inLinks) {
+    this.inLinks = inLinks;
+  }
+
+  protected String getResponseDate() {
+    if (this.simpleDateFormat) {
+      String timestamp = null;
+      try {
+        long epoch = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
+        timestamp = String.valueOf(epoch);
+      } catch (ParseException pe) {
+        LOG.warn(pe.getMessage());
+      }
+      return timestamp;
+    } else {
+      return ifNullString(metadata.get("Date"));
+    }
+  }
+
+  protected String getResponseServer() {
+    return ifNullString(metadata.get("Server"));
+  }
+
+  protected String getResponseContent() {
+    return new String(content.getContent());
+  }
+
+  protected String getKey() {
+    if (this.reverseKey) {
+      return this.reverseKeyValue;
+    }
+    else {
+      return url;
+    }
+  }
+
+  protected String getImported() {
+    if (this.simpleDateFormat) {
+      String timestamp = null;
+      try {
+        long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z").parse(ifNullString(metadata.get("Date"))).getTime();
+        timestamp = String.valueOf(epoch);
+      } catch (ParseException pe) {
+        LOG.warn(pe.getMessage());
+      }
+      return timestamp;
+    } else {
+      return ifNullString(metadata.get("Date"));
+    }
+  }
+
+  private static String ifNullString(String value) {
+    return (value != null) ? value : "";
+  }
+
+  private void startHeaders(String key, boolean nested, boolean newline) throws IOException {
+    if (this.jsonArray) {
+      startArray(key, nested, newline);
+    }
+    else {
+      startObject(key);
+    }
+  }
+
+  private void closeHeaders(String key, boolean nested, boolean newline) throws IOException {
+    if (this.jsonArray) {
+      closeArray(key, nested, newline);
+    }
+    else {
+      closeObject(key);
+    }
+  }
+
+  private void writeKeyValueWrapper(String key, String value) throws IOException {
+    if (this.jsonArray) {
+      startArray(null, true, false);
+      writeArrayValue(key);
+      writeArrayValue(value);
+      closeArray(null, true, false);
+    }
+    else {
+      writeKeyValue(key, value);
+    }
+  }
+
+  @Override
+  public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java b/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java
new file mode 100755
index 0000000..ba42745
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/Benchmark.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.OutputStream;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.CrawlDbReader;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+public class Benchmark extends Configured implements Tool {
+  private static final Log LOG = LogFactory.getLog(Benchmark.class);
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = NutchConfiguration.create();
+    int res = ToolRunner.run(conf, new Benchmark(), args);
+    System.exit(res);
+  }
+
+  @SuppressWarnings("unused")
+  private static String getDate() {
+    return new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(System
+        .currentTimeMillis()));
+  }
+
+  private void createSeeds(FileSystem fs, Path seedsDir, int count)
+      throws Exception {
+    OutputStream os = fs.create(new Path(seedsDir, "seeds"));
+    for (int i = 0; i < count; i++) {
+      String url = "http://www.test-" + i + ".com/\r\n";
+      os.write(url.getBytes());
+    }
+    os.flush();
+    os.close();
+  }
+
+  public static final class BenchmarkResults {
+    Map<String, Map<String, Long>> timings = new HashMap<String, Map<String, Long>>();
+    List<String> runs = new ArrayList<String>();
+    List<String> stages = new ArrayList<String>();
+    int seeds, depth, threads;
+    boolean delete;
+    long topN;
+    long elapsed;
+    String plugins;
+
+    public void addTiming(String stage, String run, long timing) {
+      if (!runs.contains(run)) {
+        runs.add(run);
+      }
+      if (!stages.contains(stage)) {
+        stages.add(stage);
+      }
+      Map<String, Long> t = timings.get(stage);
+      if (t == null) {
+        t = new HashMap<String, Long>();
+        timings.put(stage, t);
+      }
+      t.put(run, timing);
+    }
+
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("* Plugins:\t" + plugins + "\n");
+      sb.append("* Seeds:\t" + seeds + "\n");
+      sb.append("* Depth:\t" + depth + "\n");
+      sb.append("* Threads:\t" + threads + "\n");
+      sb.append("* TopN:\t" + topN + "\n");
+      sb.append("* Delete:\t" + delete + "\n");
+      sb.append("* TOTAL ELAPSED:\t" + elapsed + "\n");
+      for (String stage : stages) {
+        Map<String, Long> timing = timings.get(stage);
+        if (timing == null)
+          continue;
+        sb.append("- stage: " + stage + "\n");
+        for (String r : runs) {
+          Long Time = timing.get(r);
+          if (Time == null) {
+            continue;
+          }
+          sb.append("\trun " + r + "\t" + Time + "\n");
+        }
+      }
+      return sb.toString();
+    }
+
+    public List<String> getStages() {
+      return stages;
+    }
+
+    public List<String> getRuns() {
+      return runs;
+    }
+  }
+
+  public int run(String[] args) throws Exception {
+    String plugins = "protocol-http|parse-tika|scoring-opic|urlfilter-regex|urlnormalizer-pass";
+    int seeds = 1;
+    int depth = 10;
+    int threads = 10;
+    boolean delete = true;
+    long topN = Long.MAX_VALUE;
+
+    if (args.length == 0) {
+      System.err
+          .println("Usage: Benchmark [-seeds NN] [-depth NN] [-threads NN] [-keep] [-maxPerHost NN] [-plugins <regex>]");
+      System.err
+          .println("\t-seeds NN\tcreate NN unique hosts in a seed list (default: 1)");
+      System.err.println("\t-depth NN\tperform NN crawl cycles (default: 10)");
+      System.err
+          .println("\t-threads NN\tuse NN threads per Fetcher task (default: 10)");
+      System.err
+          .println("\t-keep\tkeep segment data (default: delete after updatedb)");
+      System.err.println("\t-plugins <regex>\toverride 'plugin.includes'.");
+      System.err.println("\tNOTE: if not specified, this is reset to: "
+          + plugins);
+      System.err
+          .println("\tNOTE: if 'default' is specified then a value set in nutch-default/nutch-site is used.");
+      System.err
+          .println("\t-maxPerHost NN\tmax. # of URLs per host in a fetchlist");
+      return -1;
+    }
+    int maxPerHost = Integer.MAX_VALUE;
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-seeds")) {
+        seeds = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-threads")) {
+        threads = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-depth")) {
+        depth = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-keep")) {
+        delete = false;
+      } else if (args[i].equals("-plugins")) {
+        plugins = args[++i];
+      } else if (args[i].equalsIgnoreCase("-maxPerHost")) {
+        maxPerHost = Integer.parseInt(args[++i]);
+      } else {
+        LOG.fatal("Invalid argument: '" + args[i] + "'");
+        return -1;
+      }
+    }
+    BenchmarkResults res = benchmark(seeds, depth, threads, maxPerHost, topN,
+        delete, plugins);
+    System.out.println(res);
+    return 0;
+  }
+
+  public BenchmarkResults benchmark(int seeds, int depth, int threads,
+      int maxPerHost, long topN, boolean delete, String plugins)
+      throws Exception {
+    Configuration conf = getConf();
+    conf.set("http.proxy.host", "localhost");
+    conf.setInt("http.proxy.port", 8181);
+    conf.set("http.agent.name", "test");
+    conf.set("http.robots.agents", "test,*");
+    if (!plugins.equals("default")) {
+      conf.set("plugin.includes", plugins);
+    }
+    conf.setInt(Generator.GENERATOR_MAX_COUNT, maxPerHost);
+    conf.set(Generator.GENERATOR_COUNT_MODE,
+        Generator.GENERATOR_COUNT_VALUE_HOST);
+    JobConf job = new NutchJob(getConf());
+    FileSystem fs = FileSystem.get(job);
+    Path dir = new Path(getConf().get("hadoop.tmp.dir"), "bench-"
+        + System.currentTimeMillis());
+    fs.mkdirs(dir);
+    Path rootUrlDir = new Path(dir, "seed");
+    fs.mkdirs(rootUrlDir);
+    createSeeds(fs, rootUrlDir, seeds);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("crawl started in: " + dir);
+      LOG.info("rootUrlDir = " + rootUrlDir);
+      LOG.info("threads = " + threads);
+      LOG.info("depth = " + depth);
+    }
+    BenchmarkResults res = new BenchmarkResults();
+    res.delete = delete;
+    res.depth = depth;
+    res.plugins = plugins;
+    res.seeds = seeds;
+    res.threads = threads;
+    res.topN = topN;
+    Path crawlDb = new Path(dir + "/crawldb");
+    Path linkDb = new Path(dir + "/linkdb");
+    Path segments = new Path(dir + "/segments");
+    res.elapsed = System.currentTimeMillis();
+    Injector injector = new Injector(getConf());
+    Generator generator = new Generator(getConf());
+    Fetcher fetcher = new Fetcher(getConf());
+    ParseSegment parseSegment = new ParseSegment(getConf());
+    CrawlDb crawlDbTool = new CrawlDb(getConf());
+    LinkDb linkDbTool = new LinkDb(getConf());
+
+    // initialize crawlDb
+    long start = System.currentTimeMillis();
+    injector.inject(crawlDb, rootUrlDir);
+    long delta = System.currentTimeMillis() - start;
+    res.addTiming("inject", "0", delta);
+    int i;
+    for (i = 0; i < depth; i++) { // generate new segment
+      start = System.currentTimeMillis();
+      Path[] segs = generator.generate(crawlDb, segments, -1, topN,
+          System.currentTimeMillis());
+      delta = System.currentTimeMillis() - start;
+      res.addTiming("generate", i + "", delta);
+      if (segs == null) {
+        LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");
+        break;
+      }
+      start = System.currentTimeMillis();
+      fetcher.fetch(segs[0], threads); // fetch it
+      delta = System.currentTimeMillis() - start;
+      res.addTiming("fetch", i + "", delta);
+      if (!Fetcher.isParsing(job)) {
+        start = System.currentTimeMillis();
+        parseSegment.parse(segs[0]); // parse it, if needed
+        delta = System.currentTimeMillis() - start;
+        res.addTiming("parse", i + "", delta);
+      }
+      start = System.currentTimeMillis();
+      crawlDbTool.update(crawlDb, segs, true, true); // update crawldb
+      delta = System.currentTimeMillis() - start;
+      res.addTiming("update", i + "", delta);
+      start = System.currentTimeMillis();
+      linkDbTool.invert(linkDb, segs, true, true, false); // invert links
+      delta = System.currentTimeMillis() - start;
+      res.addTiming("invert", i + "", delta);
+      // delete data
+      if (delete) {
+        for (Path p : segs) {
+          fs.delete(p, true);
+        }
+      }
+    }
+    if (i == 0) {
+      LOG.warn("No URLs to fetch - check your seed list and URL filters.");
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("crawl finished: " + dir);
+    }
+    res.elapsed = System.currentTimeMillis() - res.elapsed;
+    CrawlDbReader dbreader = new CrawlDbReader();
+    dbreader.processStatJob(crawlDb.toString(), job, false);
+    return res;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java
new file mode 100644
index 0000000..d8c06c0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlConfig.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Properties;
+
+public class CommonCrawlConfig implements Serializable {
+
+	/**
+	 * Serial version UID
+	 */
+	private static final long serialVersionUID = 5235013733207799661L;
+	
+	// Prefix for key value in the output format
+	private String keyPrefix = "";
+	
+	private boolean simpleDateFormat = false;
+	
+	private boolean jsonArray = false;
+	
+	private boolean reverseKey = false;
+	
+	private String reverseKeyValue = "";
+
+	private boolean compressed = false;
+
+	private long warcSize = 0;
+
+	private String outputDir;
+	
+	/**
+	 * Default constructor
+	 */
+	public CommonCrawlConfig() {
+		// TODO init(this.getClass().getResourceAsStream("CommonCrawlConfig.properties"));
+	}
+	
+	public CommonCrawlConfig(InputStream stream) {
+		init(stream);
+	}
+	
+	private void init(InputStream stream) {
+		if (stream == null) {
+			return;
+		}
+		Properties properties = new Properties();
+		
+		try {
+			properties.load(stream);
+		} catch (IOException e) {
+			// TODO
+		} finally {
+			try {
+				stream.close();
+			} catch (IOException e) {
+				// TODO
+			}
+		}
+
+		setKeyPrefix(properties.getProperty("keyPrefix", ""));
+		setSimpleDateFormat(Boolean.parseBoolean(properties.getProperty("simpleDateFormat", "False")));
+		setJsonArray(Boolean.parseBoolean(properties.getProperty("jsonArray", "False")));
+		setReverseKey(Boolean.parseBoolean(properties.getProperty("reverseKey", "False")));
+	}
+	
+	public void setKeyPrefix(String keyPrefix) {
+		this.keyPrefix = keyPrefix;
+	}
+	
+	public void setSimpleDateFormat(boolean simpleDateFormat) {
+		this.simpleDateFormat = simpleDateFormat;
+	}
+	
+	public void setJsonArray(boolean jsonArray) {
+		this.jsonArray = jsonArray;
+	}
+	
+	public void setReverseKey(boolean reverseKey) {
+		this.reverseKey = reverseKey;
+	}
+	
+	public void setReverseKeyValue(String reverseKeyValue) {
+		this.reverseKeyValue = reverseKeyValue;
+	}
+	
+	public String getKeyPrefix() {
+		return this.keyPrefix;
+	}
+	
+	public boolean getSimpleDateFormat() {
+		return this.simpleDateFormat;
+	}
+	
+	public boolean getJsonArray() {
+		return this.jsonArray;
+	}
+	
+	public boolean getReverseKey() {
+		return this.reverseKey;
+	}
+	
+	public String getReverseKeyValue() {
+		return this.reverseKeyValue;
+	}
+
+	public boolean isCompressed() {
+		return compressed;
+	}
+
+	public void setCompressed(boolean compressed) {
+		this.compressed = compressed;
+	}
+
+	public long getWarcSize() {
+		return warcSize;
+	}
+
+	public void setWarcSize(long warcSize) {
+		this.warcSize = warcSize;
+	}
+
+	public String getOutputDir() {
+		return outputDir;
+	}
+
+	public void setOutputDir(String outputDir) {
+		this.outputDir = outputDir;
+	}
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
new file mode 100644
index 0000000..b4fc0a7
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlDataDumper.java
@@ -0,0 +1,716 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+//JDK imports
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
+//Commons imports
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.FilenameUtils;
+
+//Hadoop
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.Inlink;
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.crawl.LinkDbReader;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.DumpFileUtil;
+import org.apache.nutch.util.NutchConfiguration;
+//Tika imports
+import org.apache.tika.Tika;
+
+import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
+import com.fasterxml.jackson.dataformat.cbor.CBORGenerator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.ibm.icu.text.DateFormat;
+import com.ibm.icu.text.SimpleDateFormat;
+
+/**
+ * <p>
+ * The Common Crawl Data Dumper tool enables one to reverse generate the raw
+ * content from Nutch segment data directories into a common crawling data
+ * format, consumed by many applications. The data is then serialized as <a
+ * href="http://cbor.io">CBOR</a>
+ * </p>
+ * <p>
+ * Text content will be stored in a structured document format. Below is a
+ * schema for storage of data and metadata related to a crawling request, with
+ * the response body truncated for readability. This document must be encoded
+ * using CBOR and should be compressed with gzip after encoding. The timestamped
+ * URL key for these records' keys follows the same layout as the media file
+ * directory structure, with underscores in place of directory separators. </li>
+ * </p>
+ * <p>
+ * Thus, the timestamped url key for the record is provided below followed by an
+ * example record:
+ * <p/>
+ * <pre>
+ * {@code
+ * com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000
+ *
+ *     {
+ *         "url": "http:\/\/somepage.com\/22\/14560817",
+ *         "timestamp": "1411623696000",
+ *         "request": {
+ *             "method": "GET",
+ *             "client": {
+ *                 "hostname": "crawler01.local",
+ *                 "address": "74.347.129.200",
+ *                 "software": "Apache Nutch v1.10",
+ *                 "robots": "classic",
+ *                 "contact": {
+ *                     "name": "Nutch Admin",
+ *                     "email": "nutch.pro@nutchadmin.org"
+ *                 }
+ *             },
+ *             "headers": {
+ *                 "Accept": "text\/html,application\/xhtml+xml,application\/xml",
+ *                 "Accept-Encoding": "gzip,deflate,sdch",
+ *                 "Accept-Language": "en-US,en",
+ *                 "User-Agent": "Mozilla\/5.0",
+ *                 "...": "..."
+ *             },
+ *             "body": null
+ *         },
+ *         "response": {
+ *             "status": "200",
+ *             "server": {
+ *                 "hostname": "somepage.com",
+ *                 "address": "55.33.51.19",
+ *             },
+ *             "headers": {
+ *                 "Content-Encoding": "gzip",
+ *                 "Content-Type": "text\/html",
+ *                 "Date": "Thu, 25 Sep 2014 04:16:58 GMT",
+ *                 "Expires": "Thu, 25 Sep 2014 04:16:57 GMT",
+ *                 "Server": "nginx",
+ *                 "...": "..."
+ *             },
+ *             "body": "\r\n  <!DOCTYPE html PUBLIC ... \r\n\r\n  \r\n    </body>\r\n    </html>\r\n  \r\n\r\n",
+ *         },
+ *         "key": "com_somepage_33a3e36bbef59c2a5242c2ccee59239ab30d51f3_1411623696000",
+ *         "imported": "1411623698000"
+ *     }
+ *     }
+ * </pre>
+ * <p/>
+ * <p>
+ * Upon successful completion the tool displays a very convenient JSON snippet
+ * detailing the mimetype classifications and the counts of documents which fall
+ * into those classifications. An example is as follows:
+ * </p>
+ * <p/>
+ * <pre>
+ * {@code
+ * INFO: File Types:
+ *   TOTAL Stats:    {
+ *     {"mimeType":"application/xml","count":19"}
+ *     {"mimeType":"image/png","count":47"}
+ *     {"mimeType":"image/jpeg","count":141"}
+ *     {"mimeType":"image/vnd.microsoft.icon","count":4"}
+ *     {"mimeType":"text/plain","count":89"}
+ *     {"mimeType":"video/quicktime","count":2"}
+ *     {"mimeType":"image/gif","count":63"}
+ *     {"mimeType":"application/xhtml+xml","count":1670"}
+ *     {"mimeType":"application/octet-stream","count":40"}
+ *     {"mimeType":"text/html","count":1863"}
+ *   }
+ * }
+ * </pre>
+ */
+public class CommonCrawlDataDumper extends Configured implements Tool {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CommonCrawlDataDumper.class.getName());
+  private static final int MAX_INLINKS = 5000;
+  
+  private CommonCrawlConfig config = null;
+
+  // Gzip initialization
+  private FileOutputStream fileOutput = null;
+  private BufferedOutputStream bufOutput = null;
+  private GzipCompressorOutputStream gzipOutput = null;
+  private TarArchiveOutputStream tarOutput = null;
+  private ArrayList<String> fileList = null;
+
+  /**
+   * Main method for invoking this tool
+   *
+   * @param args 1) output directory (which will be created if it does not
+   *             already exist) to host the CBOR data and 2) a directory
+   *             containing one or more segments from which we wish to generate
+   *             CBOR data from. Optionally, 3) a list of mimetypes and the 4)
+   *             the gzip option may be provided.
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception {
+    Configuration conf = NutchConfiguration.create();
+    int res = ToolRunner.run(conf, new CommonCrawlDataDumper(), args);
+    System.exit(res);
+  }
+
+  /**
+   * Constructor
+   */
+  public CommonCrawlDataDumper(CommonCrawlConfig config) {
+    this.config = config;
+  }
+
+  public CommonCrawlDataDumper() {
+  }
+
+  /**
+   * Dumps the reverse engineered CBOR content from the provided segment
+   * directories if a parent directory contains more than one segment,
+   * otherwise a single segment can be passed as an argument. If the boolean
+   * argument is provided then the CBOR is also zipped.
+   *
+   * @param outputDir      the directory you wish to dump the raw content to. This
+   *                       directory will be created.
+   * @param segmentRootDir a directory containing one or more segments.
+   * @param linkdb         Path to linkdb.
+   * @param gzip           a boolean flag indicating whether the CBOR content should also
+   *                       be gzipped.
+   * @param epochFilename  if {@code true}, output files will be names using the epoch time (in milliseconds).
+   * @param extension      a file extension to use with output documents.
+   * @throws Exception if any exception occurs.
+   */
+  public void dump(File outputDir, File segmentRootDir, File linkdb, boolean gzip,
+      String[] mimeTypes, boolean epochFilename, String extension, boolean warc)
+      throws Exception {
+    if (gzip) {
+      LOG.info("Gzipping CBOR data has been skipped");
+    }
+    // total file counts
+    Map<String, Integer> typeCounts = new HashMap<String, Integer>();
+    // filtered file counters
+    Map<String, Integer> filteredCounts = new HashMap<String, Integer>();
+
+    Configuration nutchConfig = NutchConfiguration.create();
+    final FileSystem fs = FileSystem.get(nutchConfig);
+    Path segmentRootPath = new Path(segmentRootDir.toString());
+
+    //get all paths
+    List<Path> parts = new ArrayList<>();
+    RemoteIterator<LocatedFileStatus> files = fs.listFiles(segmentRootPath, true);
+    String partPattern = ".*" + File.separator + Content.DIR_NAME
+        + File.separator + "part-[0-9]{5}" + File.separator + "data";
+    while (files.hasNext()) {
+      LocatedFileStatus next = files.next();
+      if (next.isFile()) {
+        Path path = next.getPath();
+        if (path.toString().matches(partPattern)){
+          parts.add(path);
+        }
+      }
+    }
+
+    LinkDbReader linkDbReader = null;
+    if (linkdb != null) {
+      linkDbReader = new LinkDbReader(fs.getConf(), new Path(linkdb.toString()));
+    }
+    if (parts == null || parts.size() == 0) {
+      LOG.error( "No segment directories found in {} ",
+          segmentRootDir.getAbsolutePath());
+      System.exit(1);
+    }
+    LOG.info("Found {} segment parts", parts.size());
+    if (gzip && !warc) {
+      fileList = new ArrayList<>();
+      constructNewStream(outputDir);
+    }
+
+    for (Path segmentPart : parts) {
+      LOG.info("Processing segment Part : [ {} ]", segmentPart);
+      try {
+        SequenceFile.Reader reader = new SequenceFile.Reader(nutchConfig,
+            SequenceFile.Reader.file(segmentPart));
+
+        Writable key = (Writable) reader.getKeyClass().newInstance();
+
+        Content content = null;
+        while (reader.next(key)) {
+          content = new Content();
+          reader.getCurrentValue(content);
+          Metadata metadata = content.getMetadata();
+          String url = key.toString();
+
+          String baseName = FilenameUtils.getBaseName(url);
+          String extensionName = FilenameUtils.getExtension(url);
+
+          if (!extension.isEmpty()) {
+            extensionName = extension;
+          } else if ((extensionName == null) || extensionName.isEmpty()) {
+            extensionName = "html";
+          }
+
+          String outputFullPath = null;
+          String outputRelativePath = null;
+          String filename = null;
+          String timestamp = null;
+          String reverseKey = null;
+
+          if (epochFilename || config.getReverseKey()) {
+            try {
+              long epoch = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss z")
+                  .parse(getDate(metadata.get("Date"))).getTime();
+              timestamp = String.valueOf(epoch);
+            } catch (ParseException pe) {
+              LOG.warn(pe.getMessage());
+            }
+
+            reverseKey = reverseUrl(url);
+            config.setReverseKeyValue(
+                reverseKey.replace("/", "_") + "_" + DigestUtils.sha1Hex(url)
+                    + "_" + timestamp);
+          }
+
+          if (!warc) {
+            if (epochFilename) {
+              outputFullPath = DumpFileUtil
+                  .createFileNameFromUrl(outputDir.getAbsolutePath(),
+                      reverseKey, url, timestamp, extensionName, !gzip);
+              outputRelativePath = outputFullPath
+                  .substring(0, outputFullPath.lastIndexOf(File.separator) - 1);
+              filename = content.getMetadata().get(Metadata.DATE) + "."
+                  + extensionName;
+            } else {
+              String md5Ofurl = DumpFileUtil.getUrlMD5(url);
+              String fullDir = DumpFileUtil
+                  .createTwoLevelsDirectory(outputDir.getAbsolutePath(),
+                      md5Ofurl, !gzip);
+              filename = DumpFileUtil
+                  .createFileName(md5Ofurl, baseName, extensionName);
+              outputFullPath = String.format("%s/%s", fullDir, filename);
+
+              String[] fullPathLevels = fullDir.split(File.separator);
+              String firstLevelDirName = fullPathLevels[fullPathLevels.length
+                  - 2];
+              String secondLevelDirName = fullPathLevels[fullPathLevels.length
+                  - 1];
+              outputRelativePath = firstLevelDirName + secondLevelDirName;
+            }
+          }
+          // Encode all filetypes if no mimetypes have been given
+          Boolean filter = (mimeTypes == null);
+
+          String jsonData = "";
+          try {
+            String mimeType = new Tika().detect(content.getContent());
+            // Maps file to JSON-based structure
+
+            Set<String> inUrls = null; //there may be duplicates, so using set
+            if (linkDbReader != null) {
+              Inlinks inlinks = linkDbReader.getInlinks((Text) key);
+              if (inlinks != null) {
+                Iterator<Inlink> iterator = inlinks.iterator();
+                inUrls = new LinkedHashSet<>();
+                while (inUrls.size() <= MAX_INLINKS && iterator.hasNext()){
+                  inUrls.add(iterator.next().getFromUrl());
+                }
+              }
+            }
+            //TODO: Make this Jackson Format implementation reusable
+            try (CommonCrawlFormat format = CommonCrawlFormatFactory
+                .getCommonCrawlFormat(warc ? "WARC" : "JACKSON", nutchConfig, config)) {
+              if (inUrls != null) {
+                format.setInLinks(new ArrayList<>(inUrls));
+              }
+              jsonData = format.getJsonData(url, content, metadata);
+            }
+
+            collectStats(typeCounts, mimeType);
+            // collects statistics for the given mimetypes
+            if ((mimeType != null) && (mimeTypes != null) && Arrays
+                .asList(mimeTypes).contains(mimeType)) {
+              collectStats(filteredCounts, mimeType);
+              filter = true;
+            }
+          } catch (IOException ioe) {
+            LOG.error("Fatal error in creating JSON data: " + ioe.getMessage());
+            return;
+          }
+
+          if (!warc) {
+            if (filter) {
+              byte[] byteData = serializeCBORData(jsonData);
+
+              if (!gzip) {
+                File outputFile = new File(outputFullPath);
+                if (outputFile.exists()) {
+                  LOG.info("Skipping writing: [" + outputFullPath
+                      + "]: file already exists");
+                } else {
+                  LOG.info("Writing: [" + outputFullPath + "]");
+                  IOUtils.copy(new ByteArrayInputStream(byteData),
+                      new FileOutputStream(outputFile));
+                }
+              } else {
+                if (fileList.contains(outputFullPath)) {
+                  LOG.info("Skipping compressing: [" + outputFullPath
+                      + "]: file already exists");
+                } else {
+                  fileList.add(outputFullPath);
+                  LOG.info("Compressing: [" + outputFullPath + "]");
+                  //TarArchiveEntry tarEntry = new TarArchiveEntry(firstLevelDirName + File.separator + secondLevelDirName + File.separator + filename);
+                  TarArchiveEntry tarEntry = new TarArchiveEntry(
+                      outputRelativePath + File.separator + filename);
+                  tarEntry.setSize(byteData.length);
+                  tarOutput.putArchiveEntry(tarEntry);
+                  tarOutput.write(byteData);
+                  tarOutput.closeArchiveEntry();
+                }
+              }
+            }
+          }
+        }
+        reader.close();
+      } catch (Exception e){
+        LOG.warn("SKIPPED: {} Because : {}", segmentPart, e.getMessage());
+      } finally {
+        fs.close();
+      }
+    }
+
+    if (gzip && !warc) {
+      closeStream();
+    }
+
+    if (!typeCounts.isEmpty()) {
+      LOG.info("CommonsCrawlDataDumper File Stats: " + DumpFileUtil
+          .displayFileTypes(typeCounts, filteredCounts));
+    }
+
+  }
+
+  private void closeStream() {
+    try {
+      tarOutput.finish();
+
+      tarOutput.close();
+      gzipOutput.close();
+      bufOutput.close();
+      fileOutput.close();
+    } catch (IOException ioe) {
+      LOG.warn("Error in closing stream: " + ioe.getMessage());
+    }
+  }
+
+  private void constructNewStream(File outputDir) throws IOException {
+    String archiveName = new SimpleDateFormat("yyyyMMddhhmm'.tar.gz'")
+        .format(new Date());
+    LOG.info("Creating a new gzip archive: " + archiveName);
+    fileOutput = new FileOutputStream(
+        new File(outputDir + File.separator + archiveName));
+    bufOutput = new BufferedOutputStream(fileOutput);
+    gzipOutput = new GzipCompressorOutputStream(bufOutput);
+    tarOutput = new TarArchiveOutputStream(gzipOutput);
+    tarOutput.setLongFileMode(TarArchiveOutputStream.LONGFILE_GNU);
+  }
+
+  /**
+   * Writes the CBOR "Self-Describe Tag" (value 55799, serialized as 3-byte
+   * sequence of {@code 0xd9d9f7}) at the current position. This method must
+   * be used to write the CBOR magic number at the beginning of the document.
+   * Since version 2.5, <a
+   * href="https://github.com/FasterXML/jackson-dataformat-cbor"
+   * >jackson-dataformat-cbor</a> will support the {@code WRITE_TYPE_HEADER}
+   * feature to write that type tag at the beginning of the document.
+   *
+   * @param generator {@link CBORGenerator} object used to create a CBOR-encoded document.
+   * @throws IOException if any I/O error occurs.
+   * @see <a href="https://tools.ietf.org/html/rfc7049#section-2.4.5">RFC
+   * 7049</a>
+   */
+  private void writeMagicHeader(CBORGenerator generator) throws IOException {
+    // Writes self-describe CBOR
+    // https://tools.ietf.org/html/rfc7049#section-2.4.5
+    // It will be supported in jackson-cbor since 2.5
+    byte[] header = new byte[3];
+    header[0] = (byte) 0xd9;
+    header[1] = (byte) 0xd9;
+    header[2] = (byte) 0xf7;
+    generator.writeBytes(header, 0, header.length);
+  }
+
+  private byte[] serializeCBORData(String jsonData) {
+    CBORFactory factory = new CBORFactory();
+
+    CBORGenerator generator = null;
+    ByteArrayOutputStream stream = null;
+
+    try {
+      stream = new ByteArrayOutputStream();
+      generator = factory.createGenerator(stream);
+      // Writes CBOR tag
+      writeMagicHeader(generator);
+      generator.writeString(jsonData);
+      generator.flush();
+      stream.flush();
+
+      return stream.toByteArray();
+
+    } catch (Exception e) {
+      LOG.warn("CBOR encoding failed: " + e.getMessage());
+    } finally {
+      try {
+        generator.close();
+        stream.close();
+      } catch (IOException e) {
+        // nothing to do
+      }
+    }
+
+    return null;
+  }
+
+  private void collectStats(Map<String, Integer> typeCounts, String mimeType) {
+    typeCounts.put(mimeType,
+        typeCounts.containsKey(mimeType) ? typeCounts.get(mimeType) + 1 : 1);
+  }
+
+  /**
+   * Gets the current date if the given timestamp is empty or null.
+   *
+   * @param timestamp the timestamp
+   * @return the current timestamp if the given one is null.
+   */
+  private String getDate(String timestamp) {
+    if (timestamp == null || timestamp.isEmpty()) {
+      DateFormat dateFormat = new SimpleDateFormat(
+          "EEE, d MMM yyyy HH:mm:ss z");
+      timestamp = dateFormat.format(new Date());
+    }
+    return timestamp;
+
+  }
+
+  public static String reverseUrl(String urlString) {
+    URL url;
+    String reverseKey = null;
+    try {
+      url = new URL(urlString);
+
+      String[] hostPart = url.getHost().replace('.', '/').split("/");
+
+      StringBuilder sb = new StringBuilder();
+      sb.append(hostPart[hostPart.length - 1]);
+      for (int i = hostPart.length - 2; i >= 0; i--) {
+        sb.append("/" + hostPart[i]);
+      }
+
+      reverseKey = sb.toString();
+
+    } catch (MalformedURLException e) {
+      LOG.error("Failed to parse URL: {}", urlString);
+    }
+
+    return reverseKey;
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    Option helpOpt = new Option("h", "help", false, "show this help message.");
+    // argument options
+    @SuppressWarnings("static-access")
+    Option outputOpt = OptionBuilder.withArgName("outputDir").hasArg()
+        .withDescription(
+            "output directory (which will be created) to host the CBOR data.")
+        .create("outputDir");
+    // WARC format
+    Option warcOpt = new Option("warc", "export to a WARC file");
+
+    @SuppressWarnings("static-access")
+    Option segOpt = OptionBuilder.withArgName("segment").hasArgs()
+        .withDescription("the segment or directory containing segments to use").create("segment");
+    // create mimetype and gzip options
+    @SuppressWarnings("static-access")
+    Option mimeOpt = OptionBuilder.isRequired(false).withArgName("mimetype")
+        .hasArgs().withDescription(
+            "an optional list of mimetypes to dump, excluding all others. Defaults to all.")
+        .create("mimetype");
+    @SuppressWarnings("static-access")
+    Option gzipOpt = OptionBuilder.withArgName("gzip").hasArg(false)
+        .withDescription(
+            "an optional flag indicating whether to additionally gzip the data.")
+        .create("gzip");
+    @SuppressWarnings("static-access")
+    Option keyPrefixOpt = OptionBuilder.withArgName("keyPrefix").hasArg(true)
+        .withDescription("an optional prefix for key in the output format.")
+        .create("keyPrefix");
+    @SuppressWarnings("static-access")
+    Option simpleDateFormatOpt = OptionBuilder.withArgName("SimpleDateFormat")
+        .hasArg(false).withDescription(
+            "an optional format for timestamp in GMT epoch milliseconds.")
+        .create("SimpleDateFormat");
+    @SuppressWarnings("static-access")
+    Option epochFilenameOpt = OptionBuilder.withArgName("epochFilename")
+        .hasArg(false)
+        .withDescription("an optional format for output filename.")
+        .create("epochFilename");
+    @SuppressWarnings("static-access")
+    Option jsonArrayOpt = OptionBuilder.withArgName("jsonArray").hasArg(false)
+        .withDescription("an optional format for JSON output.")
+        .create("jsonArray");
+    @SuppressWarnings("static-access")
+    Option reverseKeyOpt = OptionBuilder.withArgName("reverseKey").hasArg(false)
+        .withDescription("an optional format for key value in JSON output.")
+        .create("reverseKey");
+    @SuppressWarnings("static-access")
+    Option extensionOpt = OptionBuilder.withArgName("extension").hasArg(true)
+        .withDescription("an optional file extension for output documents.")
+        .create("extension");
+    @SuppressWarnings("static-access")
+    Option sizeOpt = OptionBuilder.withArgName("warcSize").hasArg(true)
+        .withType(Number.class)
+        .withDescription("an optional file size in bytes for the WARC file(s)")
+        .create("warcSize");
+    @SuppressWarnings("static-access")
+    Option linkDbOpt = OptionBuilder.withArgName("linkdb").hasArg(true)
+        .withDescription("an optional linkdb parameter to include inlinks in dump files")
+        .isRequired(false)
+        .create("linkdb");
+
+    // create the options
+    Options options = new Options();
+    options.addOption(helpOpt);
+    options.addOption(outputOpt);
+    options.addOption(segOpt);
+    // create mimetypes and gzip options
+    options.addOption(warcOpt);
+    options.addOption(mimeOpt);
+    options.addOption(gzipOpt);
+    // create keyPrefix option
+    options.addOption(keyPrefixOpt);
+    // create simpleDataFormat option
+    options.addOption(simpleDateFormatOpt);
+    options.addOption(epochFilenameOpt);
+    options.addOption(jsonArrayOpt);
+    options.addOption(reverseKeyOpt);
+    options.addOption(extensionOpt);
+    options.addOption(sizeOpt);
+    options.addOption(linkDbOpt);
+
+    CommandLineParser parser = new GnuParser();
+    try {
+      CommandLine line = parser.parse(options, args);
+      if (line.hasOption("help") || !line.hasOption("outputDir") || (!line
+          .hasOption("segment"))) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter
+            .printHelp(CommonCrawlDataDumper.class.getName(), options, true);
+        return 0;
+      }
+
+      File outputDir = new File(line.getOptionValue("outputDir"));
+      File segmentRootDir = new File(line.getOptionValue("segment"));
+      String[] mimeTypes = line.getOptionValues("mimetype");
+      boolean gzip = line.hasOption("gzip");
+      boolean epochFilename = line.hasOption("epochFilename");
+
+      String keyPrefix = line.getOptionValue("keyPrefix", "");
+      boolean simpleDateFormat = line.hasOption("SimpleDateFormat");
+      boolean jsonArray = line.hasOption("jsonArray");
+      boolean reverseKey = line.hasOption("reverseKey");
+      String extension = line.getOptionValue("extension", "");
+      boolean warc = line.hasOption("warc");
+      long warcSize = 0;
+
+      if (line.getParsedOptionValue("warcSize") != null) {
+        warcSize = (Long) line.getParsedOptionValue("warcSize");
+      }
+      String linkdbPath = line.getOptionValue("linkdb");
+      File linkdb = linkdbPath == null ? null : new File(linkdbPath);
+
+      CommonCrawlConfig config = new CommonCrawlConfig();
+      config.setKeyPrefix(keyPrefix);
+      config.setSimpleDateFormat(simpleDateFormat);
+      config.setJsonArray(jsonArray);
+      config.setReverseKey(reverseKey);
+      config.setCompressed(gzip);
+      config.setWarcSize(warcSize);
+      config.setOutputDir(line.getOptionValue("outputDir"));
+
+      if (!outputDir.exists()) {
+        LOG.warn("Output directory: [" + outputDir.getAbsolutePath()
+            + "]: does not exist, creating it.");
+        if (!outputDir.mkdirs())
+          throw new Exception(
+              "Unable to create: [" + outputDir.getAbsolutePath() + "]");
+      }
+
+      CommonCrawlDataDumper dumper = new CommonCrawlDataDumper(config);
+
+      dumper.dump(outputDir, segmentRootDir, linkdb, gzip, mimeTypes, epochFilename,
+          extension, warc);
+
+    } catch (Exception e) {
+      LOG.error(CommonCrawlDataDumper.class.getName() + ": " + StringUtils
+          .stringifyException(e));
+      e.printStackTrace();
+      return -1;
+    }
+
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java
new file mode 100644
index 0000000..0834d95
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormat.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.protocol.Content;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Interface for all CommonCrawl formatter. It provides the signature for the
+ * method used to get JSON data.
+ *
+ * @author gtotaro
+ *
+ */
+public interface CommonCrawlFormat extends Closeable {
+
+  /**
+   *
+   * @param mapAll If {@code true} maps all metdata on the JSON structure.
+   * @return the JSON data
+   */
+  //public String getJsonData(boolean mapAll) throws IOException;
+  public String getJsonData() throws IOException;
+
+  /**
+   * Returns a string representation of the JSON structure of the URL content
+   *
+   * @param url
+   * @param content
+   * @param metadata
+   * @return
+   */
+  public String getJsonData(String url, Content content, Metadata metadata)
+      throws IOException;
+
+  /**
+   * Returns a string representation of the JSON structure of the URL content
+   * takes into account the parsed metadata about the URL
+   *
+   * @param url
+   * @param content
+   * @param metadata
+   * @return
+   */
+  public String getJsonData(String url, Content content, Metadata metadata,
+      ParseData parseData) throws IOException;
+
+
+  /**
+   * sets inlinks of this document
+   * @param inLinks list of inlinks
+   */
+  void setInLinks(List<String> inLinks);
+
+
+  /**
+   * gets set of inlinks
+   * @return gets inlinks of this document
+   */
+  List<String> getInLinks();
+
+  /**
+   * Optional method that could be implemented if the actual format needs some
+   * close procedure.
+   */
+  public abstract void close();
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
new file mode 100644
index 0000000..8814168
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatFactory.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * Factory class that creates new {@see CommonCrawlFormat} objects (a.k.a. formatter) that map crawled files to CommonCrawl format.   
+ *
+ */
+public class CommonCrawlFormatFactory {
+	
+	/**
+	 * Returns a new instance of a {@see CommonCrawlFormat} object specifying the type of formatter. 
+	 * @param formatType the type of formatter to be created.
+	 * @param url the url.
+	 * @param content the content.
+	 * @param metadata the metadata.
+	 * @param nutchConf the configuration.
+	 * @param config the CommonCrawl output configuration.
+	 * @return the new {@see CommonCrawlFormat} object.
+	 * @throws IOException If any I/O error occurs.
+	 * @deprecated
+	 */
+	public static CommonCrawlFormat getCommonCrawlFormat(String formatType, String url, Content content,	Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+		if (formatType == null) {
+			return null;
+		}
+		
+		if (formatType.equalsIgnoreCase("jackson")) {
+			return new CommonCrawlFormatJackson(url, content, metadata, nutchConf, config);
+		}
+		else if (formatType.equalsIgnoreCase("jettinson")) {
+			return new CommonCrawlFormatJettinson(url, content, metadata, nutchConf, config);
+		}
+		else if (formatType.equalsIgnoreCase("simple")) {
+			return new CommonCrawlFormatSimple(url, content, metadata, nutchConf, config);
+		}
+		
+		return null;
+	}
+
+	// The format should not depend on variable attributes, essentially this
+	// should be one for the full job
+	public static CommonCrawlFormat getCommonCrawlFormat(String formatType, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+		if (formatType.equalsIgnoreCase("WARC")) {
+			return new CommonCrawlFormatWARC(nutchConf, config);
+		}
+
+		if (formatType.equalsIgnoreCase("JACKSON")) {
+			return new CommonCrawlFormatJackson( nutchConf, config);
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
new file mode 100644
index 0000000..0d6cae2
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJackson.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * This class provides methods to map crawled data on JSON using Jackson Streaming APIs. 
+ *
+ */
+public class CommonCrawlFormatJackson extends AbstractCommonCrawlFormat {
+	
+	private ByteArrayOutputStream out;
+	
+	private JsonGenerator generator;
+
+	public CommonCrawlFormatJackson(Configuration nutchConf,
+			CommonCrawlConfig config) throws IOException {
+		super(null, null, null, nutchConf, config);
+
+		JsonFactory factory = new JsonFactory();
+		this.out = new ByteArrayOutputStream();
+		this.generator = factory.createGenerator(out);
+
+		this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT
+	}
+	
+	public CommonCrawlFormatJackson(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+		super(url, content, metadata, nutchConf, config);
+		
+		JsonFactory factory = new JsonFactory();
+		this.out = new ByteArrayOutputStream();
+		this.generator = factory.createGenerator(out);
+		
+		this.generator.useDefaultPrettyPrinter(); // INDENTED OUTPUT
+	}
+	
+	@Override
+	protected void writeKeyValue(String key, String value) throws IOException {
+		generator.writeFieldName(key);
+		generator.writeString(value);
+	}
+	
+	@Override
+	protected void writeKeyNull(String key) throws IOException {
+		generator.writeFieldName(key);
+		generator.writeNull();
+	}
+	
+	@Override
+	protected void startArray(String key, boolean nested, boolean newline) throws IOException {
+		if (key != null) {
+			generator.writeFieldName(key);
+		}
+		generator.writeStartArray();
+	}
+	
+	@Override
+	protected void closeArray(String key, boolean nested, boolean newline) throws IOException {
+		generator.writeEndArray();
+	}
+	
+	@Override
+	protected void writeArrayValue(String value) throws IOException {
+		generator.writeString(value);
+	}
+	
+	@Override
+	protected void startObject(String key) throws IOException {
+		if (key != null) {
+			generator.writeFieldName(key);
+		}
+		generator.writeStartObject();
+	}
+	
+	@Override
+	protected void closeObject(String key) throws IOException {
+		generator.writeEndObject();
+	}
+	
+	@Override
+	protected String generateJson() throws IOException {
+		this.generator.flush();
+		return this.out.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
new file mode 100644
index 0000000..6950e2a
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatJettinson.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Deque;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * This class provides methods to map crawled data on JSON using Jettinson APIs. 
+ *
+ */
+public class CommonCrawlFormatJettinson extends AbstractCommonCrawlFormat {
+	
+	private Deque<JSONObject> stackObjects;
+	
+	private Deque<JSONArray> stackArrays;
+
+	public CommonCrawlFormatJettinson(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+		super(url, content, metadata, nutchConf, config);
+		
+		stackObjects = new ArrayDeque<JSONObject>();
+		stackArrays = new ArrayDeque<JSONArray>();
+	}
+	
+	@Override
+	protected void writeKeyValue(String key, String value) throws IOException {
+		try {
+			stackObjects.getFirst().put(key, value);
+		} catch (JSONException jsone) {
+			throw new IOException(jsone.getMessage());
+		}
+	}
+	
+	@Override
+	protected void writeKeyNull(String key) throws IOException {
+		try {
+			stackObjects.getFirst().put(key, JSONObject.NULL);
+		} catch (JSONException jsone) {
+			throw new IOException(jsone.getMessage());
+		}
+	}
+	
+	@Override
+	protected void startArray(String key, boolean nested, boolean newline) throws IOException {
+		JSONArray array = new JSONArray();
+		stackArrays.push(array);
+	}
+	
+	@Override
+	protected void closeArray(String key, boolean nested, boolean newline) throws IOException {
+		try {
+			if (stackArrays.size() > 1) {
+				JSONArray array = stackArrays.pop();
+				if (nested) {
+					stackArrays.getFirst().put(array);
+				}
+				else {
+					stackObjects.getFirst().put(key, array);
+				}
+			}
+		} catch (JSONException jsone) {
+			throw new IOException(jsone.getMessage());
+		}
+	}
+	
+	@Override
+	protected void writeArrayValue(String value) throws IOException {
+		if (stackArrays.size() > 1) {
+			stackArrays.getFirst().put(value);
+		}
+	}
+	
+	@Override
+	protected void startObject(String key) throws IOException {
+		JSONObject object = new JSONObject();
+		stackObjects.push(object);
+	}
+	
+	@Override
+	protected void closeObject(String key) throws IOException {
+		try {
+			if (stackObjects.size() > 1) {
+				JSONObject object = stackObjects.pop();
+				stackObjects.getFirst().put(key, object);
+			}
+		} catch (JSONException jsone) {
+			throw new IOException(jsone.getMessage());
+		}
+	}
+	
+	@Override
+	protected String generateJson() throws IOException {
+		try {
+			return stackObjects.getFirst().toString(2);
+		} catch (JSONException jsone) {
+			throw new IOException(jsone.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
new file mode 100644
index 0000000..a1aaa44
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatSimple.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.protocol.Content;
+
+/**
+ * This class provides methods to map crawled data on JSON using a {@see StringBuilder} object. 
+ *
+ */
+public class CommonCrawlFormatSimple extends AbstractCommonCrawlFormat {
+	
+	private StringBuilder sb;
+	
+	private int tabCount;
+	
+	public CommonCrawlFormatSimple(String url, Content content, Metadata metadata, Configuration nutchConf, CommonCrawlConfig config) throws IOException {
+		super(url, content, metadata, nutchConf, config);
+		
+		this.sb = new StringBuilder();
+		this.tabCount = 0;
+	}
+	
+	@Override
+	protected void writeKeyValue(String key, String value) throws IOException {
+		sb.append(printTabs() + "\"" + key + "\": " + quote(value) + ",\n");
+	}
+	
+	@Override
+	protected void writeKeyNull(String key) throws IOException {
+		sb.append(printTabs() + "\"" + key + "\": null,\n");
+	}
+	
+	@Override
+	protected void startArray(String key, boolean nested, boolean newline) throws IOException {
+		String name = (key != null) ? "\"" + key + "\": " : "";
+		String nl = (newline) ? "\n" : "";
+		sb.append(printTabs() + name + "[" + nl);
+		if (newline) {
+			this.tabCount++;
+		}
+	}
+	
+	@Override
+	protected void closeArray(String key, boolean nested, boolean newline) throws IOException {
+		if (sb.charAt(sb.length()-1) == ',') {
+			sb.deleteCharAt(sb.length()-1); // delete comma
+		}
+		else if (sb.charAt(sb.length()-2) == ',') {
+			sb.deleteCharAt(sb.length()-2); // delete comma
+		}
+		String nl = (newline) ? printTabs() : "";
+		if (newline) {
+			this.tabCount++;
+		}
+		sb.append(nl + "],\n");
+	}
+	
+	@Override
+	protected void writeArrayValue(String value) {
+		sb.append("\"" + value + "\",");
+	}
+	
+	protected void startObject(String key) throws IOException {
+		String name = "";
+		if (key != null) {
+			name = "\"" + key + "\": ";
+		}
+		sb.append(printTabs() + name + "{\n");
+		this.tabCount++;
+	}
+	
+	protected void closeObject(String key) throws IOException {
+		if (sb.charAt(sb.length()-2) == ',') {
+			sb.deleteCharAt(sb.length()-2); // delete comma
+		}
+		this.tabCount--;
+		sb.append(printTabs() + "},\n");
+	}
+	
+	protected String generateJson() throws IOException {
+		sb.deleteCharAt(sb.length()-1); // delete new line
+		sb.deleteCharAt(sb.length()-1); // delete comma
+		return sb.toString();
+	}
+	
+	private String printTabs() {
+		StringBuilder sb = new StringBuilder();
+		for (int i=0; i < this.tabCount ;i++) {
+			sb.append("\t");
+		}
+		return sb.toString();
+	}
+	
+    private static String quote(String string) throws IOException {
+    	StringBuilder sb = new StringBuilder();
+    	
+        if (string == null || string.length() == 0) {
+            sb.append("\"\"");
+            return sb.toString();
+        }
+
+        char b;
+        char c = 0;
+        String hhhh;
+        int i;
+        int len = string.length();
+
+        sb.append('"');
+        for (i = 0; i < len; i += 1) {
+            b = c;
+            c = string.charAt(i);
+            switch (c) {
+            case '\\':
+            case '"':
+                sb.append('\\');
+                sb.append(c);
+                break;
+            case '/':
+                if (b == '<') {
+                	sb.append('\\');
+                }
+                sb.append(c);
+                break;
+            case '\b':
+            	sb.append("\\b");
+                break;
+            case '\t':
+            	sb.append("\\t");
+                break;
+            case '\n':
+            	sb.append("\\n");
+                break;
+            case '\f':
+            	sb.append("\\f");
+                break;
+            case '\r':
+            	sb.append("\\r");
+                break;
+            default:
+                if (c < ' ' || (c >= '\u0080' && c < '\u00a0')
+                        || (c >= '\u2000' && c < '\u2100')) {
+                	sb.append("\\u");
+                    hhhh = Integer.toHexString(c);
+                    sb.append("0000", 0, 4 - hhhh.length());
+                    sb.append(hhhh);
+                } else {
+                	sb.append(c);
+                }
+            }
+        }
+        sb.append('"');
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
new file mode 100644
index 0000000..191e42e
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/CommonCrawlFormatWARC.java
@@ -0,0 +1,286 @@
+package org.apache.nutch.tools;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.text.ParseException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.ibm.icu.text.SimpleDateFormat;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.parse.ParseData;
+
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.archive.format.warc.WARCConstants;
+import org.archive.io.WriterPoolMember;
+import org.archive.io.warc.WARCRecordInfo;
+import org.archive.io.warc.WARCWriter;
+import org.archive.io.warc.WARCWriterPoolSettingsData;
+import org.archive.uid.UUIDGenerator;
+import org.archive.util.DateUtils;
+import org.archive.util.anvl.ANVLRecord;
+
+public class CommonCrawlFormatWARC extends AbstractCommonCrawlFormat {
+
+  public static final String MAX_WARC_FILE_SIZE = "warc.file.size.max";
+  public static final String TEMPLATE = "${prefix}-${timestamp17}-${serialno}";
+
+  private static final AtomicInteger SERIALNO = new AtomicInteger();
+  private final static UUIDGenerator GENERATOR = new UUIDGenerator();
+
+  private String outputDir = null;
+  private ByteArrayOutputStream out;
+  private WARCWriter writer;
+  private ParseData parseData;
+
+  public CommonCrawlFormatWARC(Configuration nutchConf,
+      CommonCrawlConfig config) throws IOException {
+    super(null, null, null, nutchConf, config);
+
+    this.out = new ByteArrayOutputStream();
+
+    ANVLRecord info = WARCUtils.getWARCInfoContent(nutchConf);
+    List<String> md = Collections.singletonList(info.toString());
+
+    this.outputDir = config.getOutputDir();
+
+    if (null == outputDir) {
+      String message = "Missing output directory configuration: " + outputDir;
+
+      throw new RuntimeException(message);
+    }
+
+    File file = new File(outputDir);
+
+    long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+    if (config.getWarcSize() > 0) {
+      maxSize = config.getWarcSize();
+    }
+
+    WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+        WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+        config.isCompressed(), Arrays.asList(new File[] { file }), md,
+        new UUIDGenerator());
+
+    writer = new WARCWriter(SERIALNO, settings);
+  }
+
+  public CommonCrawlFormatWARC(String url, Content content, Metadata metadata,
+      Configuration nutchConf, CommonCrawlConfig config, ParseData parseData)
+      throws IOException {
+    super(url, content, metadata, nutchConf, config);
+
+    this.out = new ByteArrayOutputStream();
+    this.parseData = parseData;
+
+    ANVLRecord info = WARCUtils.getWARCInfoContent(conf);
+    List<String> md = Collections.singletonList(info.toString());
+
+    this.outputDir = config.getOutputDir();
+
+    if (null == outputDir) {
+      String message = "Missing output directory configuration: " + outputDir;
+
+      throw new RuntimeException(message);
+    }
+
+    File file = new File(outputDir);
+
+    long maxSize = WARCConstants.DEFAULT_MAX_WARC_FILE_SIZE;
+
+    if (config.getWarcSize() > 0) {
+      maxSize = config.getWarcSize();
+    }
+
+    WARCWriterPoolSettingsData settings = new WARCWriterPoolSettingsData(
+        WriterPoolMember.DEFAULT_PREFIX, TEMPLATE, maxSize,
+        config.isCompressed(), Arrays.asList(new File[] { file }), md,
+        new UUIDGenerator());
+
+    writer = new WARCWriter(SERIALNO, settings);
+  }
+
+  public String getJsonData(String url, Content content, Metadata metadata,
+      ParseData parseData) throws IOException {
+    this.url = url;
+    this.content = content;
+    this.metadata = metadata;
+    this.parseData = parseData;
+
+    return this.getJsonData();
+  }
+
+  @Override
+  public String getJsonData() throws IOException {
+
+    long position = writer.getPosition();
+
+    try {
+      // See if we need to open a new file because we've exceeded maxBytes
+
+      // checkSize will open a new file if we exceeded the maxBytes setting
+      writer.checkSize();
+
+      if (writer.getPosition() != position) {
+        // We just closed the file because it was larger than maxBytes.
+        position = writer.getPosition();
+      }
+
+      // response record
+      URI id = writeResponse();
+
+      if (StringUtils.isNotBlank(metadata.get("_request_"))) {
+        // write the request method if any request info is found
+        writeRequest(id);
+      }
+    } catch (IOException e) {
+      // Launch the corresponding IO error
+      throw e;
+    } catch (ParseException e) {
+      // do nothing, as we can't establish a valid WARC-Date for this record
+      // lets skip it altogether
+      LOG.error("Can't get a valid date from: {}", url);
+    }
+
+    return null;
+  }
+
+  protected URI writeResponse() throws IOException, ParseException {
+    WARCRecordInfo record = new WARCRecordInfo();
+
+    record.setType(WARCConstants.WARCRecordType.response);
+    record.setUrl(getUrl());
+
+    String fetchTime;
+
+    record.setCreate14DigitDate(DateUtils
+        .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+    record.setMimetype(WARCConstants.HTTP_RESPONSE_MIMETYPE);
+    record.setRecordId(GENERATOR.getRecordID());
+
+    String IP = getResponseAddress();
+
+    if (StringUtils.isNotBlank(IP))
+      record.addExtraHeader(WARCConstants.HEADER_KEY_IP, IP);
+
+    if (ParseSegment.isTruncated(content))
+      record.addExtraHeader(WARCConstants.HEADER_KEY_TRUNCATED, "unspecified");
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+    String httpHeaders = metadata.get("_response.headers_");
+
+    if (StringUtils.isNotBlank(httpHeaders)) {
+      output.write(httpHeaders.getBytes());
+    } else {
+      // change the record type to resource as we not have information about
+      // the headers
+      record.setType(WARCConstants.WARCRecordType.resource);
+      record.setMimetype(content.getContentType());
+    }
+
+    output.write(getResponseContent().getBytes());
+
+    record.setContentLength(output.size());
+    record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+    if (output.size() > 0) {
+      // avoid generating a 0 sized record, as the webarchive library will
+      // complain about it
+      writer.writeRecord(record);
+    }
+
+    return record.getRecordId();
+  }
+
+  protected URI writeRequest(URI id) throws IOException, ParseException {
+    WARCRecordInfo record = new WARCRecordInfo();
+
+    record.setType(WARCConstants.WARCRecordType.request);
+    record.setUrl(getUrl());
+    record.setCreate14DigitDate(DateUtils
+        .getLog14Date(Long.parseLong(metadata.get("nutch.fetch.time"))));
+    record.setMimetype(WARCConstants.HTTP_REQUEST_MIMETYPE);
+    record.setRecordId(GENERATOR.getRecordID());
+
+    if (id != null) {
+      ANVLRecord headers = new ANVLRecord();
+      headers.addLabelValue(WARCConstants.HEADER_KEY_CONCURRENT_TO,
+          '<' + id.toString() + '>');
+      record.setExtraHeaders(headers);
+    }
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream();
+
+    output.write(metadata.get("_request_").getBytes());
+    record.setContentLength(output.size());
+    record.setContentStream(new ByteArrayInputStream(output.toByteArray()));
+
+    writer.writeRecord(record);
+
+    return record.getRecordId();
+  }
+
+  @Override
+  protected String generateJson() throws IOException {
+    return null;
+  }
+
+  @Override
+  protected void writeKeyValue(String key, String value) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void writeKeyNull(String key) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void startArray(String key, boolean nested, boolean newline)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void closeArray(String key, boolean nested, boolean newline)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void writeArrayValue(String value) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void startObject(String key) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  protected void closeObject(String key) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void close() {
+    if (writer != null)
+      try {
+        writer.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+  }
+}