You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:50 UTC

[34/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java b/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java
new file mode 100644
index 0000000..2e50105
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools.warc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.martinkl.warc.WARCRecord;
+import com.martinkl.warc.WARCWritable;
+import com.martinkl.warc.mapred.WARCOutputFormat;
+
+/**
+ * MapReduce job to exports Nutch segments as WARC files. The file format is
+ * documented in the [ISO
+ * Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+ * Generates elements of type response if the configuration 'store.http.headers'
+ * was set to true during the fetching and the http headers were stored
+ * verbatim; generates elements of type 'resource' otherwise.
+ **/
+
+public class WARCExporter extends Configured implements Tool {
+
+  public static Logger LOG = LoggerFactory.getLogger(WARCExporter.class);
+
+  private static final String CRLF = "\r\n";
+  private static final byte[] CRLF_BYTES = { 13, 10 };
+
+  public WARCExporter() {
+    super(null);
+  }
+
+  public WARCExporter(Configuration conf) {
+    super(conf);
+  }
+
+  public static class WARCReducer
+      implements Mapper<Text, Writable, Text, NutchWritable>,
+      Reducer<Text, NutchWritable, NullWritable, WARCWritable> {
+
+    SimpleDateFormat warcdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'",
+        Locale.ENGLISH);
+
+    @Override
+    public void configure(JobConf job) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    public void map(Text key, Writable value,
+        OutputCollector<Text, NutchWritable> output, Reporter reporter)
+            throws IOException {
+      output.collect(key, new NutchWritable(value));
+    }
+
+    @Override
+    public void reduce(Text key, Iterator<NutchWritable> values,
+        OutputCollector<NullWritable, WARCWritable> output, Reporter reporter)
+            throws IOException {
+
+      Content content = null;
+      CrawlDatum cd = null;
+
+      // aggregate the values found
+      while (values.hasNext()) {
+        final Writable value = values.next().get(); // unwrap
+        if (value instanceof Content) {
+          content = (Content) value;
+          continue;
+        }
+        if (value instanceof CrawlDatum) {
+          cd = (CrawlDatum) value;
+          continue;
+        }
+      }
+
+      // check that we have everything we need
+      if (content == null) {
+        LOG.info("Missing content for {}", key);
+        reporter.getCounter("WARCExporter", "missing content").increment(1);
+        return;
+      }
+
+      if (cd == null) {
+        LOG.info("Missing fetch datum for {}", key);
+        reporter.getCounter("WARCExporter", "missing metadata").increment(1);
+        return;
+      }
+
+      // were the headers stored as is? Can write a response element then
+      String headersVerbatim = content.getMetadata().get("_response.headers_");
+      byte[] httpheaders = new byte[0];
+      if (StringUtils.isNotBlank(headersVerbatim)) {
+        // check that ends with an empty line
+        if (!headersVerbatim.endsWith(CRLF + CRLF)) {
+          headersVerbatim += CRLF + CRLF;
+        }
+        httpheaders = headersVerbatim.getBytes();
+      }
+
+      StringBuilder buffer = new StringBuilder();
+      buffer.append(WARCRecord.WARC_VERSION);
+      buffer.append(CRLF);
+
+      buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
+          .append(UUID.randomUUID().toString()).append(">").append(CRLF);
+
+      int contentLength = 0;
+      if (content != null) {
+        contentLength = content.getContent().length;
+      }
+
+      // add the length of the http header
+      contentLength += httpheaders.length;
+
+      buffer.append("Content-Length").append(": ")
+          .append(Integer.toString(contentLength)).append(CRLF);
+
+      Date fetchedDate = new Date(cd.getFetchTime());
+      buffer.append("WARC-Date").append(": ").append(warcdf.format(fetchedDate))
+          .append(CRLF);
+
+      // check if http headers have been stored verbatim
+      // if not generate a response instead
+      String WARCTypeValue = "resource";
+
+      if (StringUtils.isNotBlank(headersVerbatim)) {
+        WARCTypeValue = "response";
+      }
+
+      buffer.append("WARC-Type").append(": ").append(WARCTypeValue)
+          .append(CRLF);
+
+      // "WARC-IP-Address" if present
+      String IP = content.getMetadata().get("_ip_");
+      if (StringUtils.isNotBlank(IP)) {
+        buffer.append("WARC-IP-Address").append(": ").append("IP").append(CRLF);
+      }
+
+      // detect if truncated only for fetch success
+      String status = CrawlDatum.getStatusName(cd.getStatus());
+      if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS")
+          && ParseSegment.isTruncated(content)) {
+        buffer.append("WARC-Truncated").append(": ").append("unspecified")
+            .append(CRLF);
+      }
+
+      // must be a valid URI
+      try {
+        String normalised = key.toString().replaceAll(" ", "%20");
+        URI uri = URI.create(normalised);
+        buffer.append("WARC-Target-URI").append(": ")
+            .append(uri.toASCIIString()).append(CRLF);
+      } catch (Exception e) {
+        LOG.error("Invalid URI {} ", key);
+        reporter.getCounter("WARCExporter", "invalid URI").increment(1);
+        return;
+      }
+
+      // provide a ContentType if type response
+      if (WARCTypeValue.equals("response")) {
+        buffer.append("Content-Type: application/http; msgtype=response")
+            .append(CRLF);
+      }
+
+      // finished writing the WARC headers, now let's serialize it
+
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+      // store the headers
+      bos.write(buffer.toString().getBytes("UTF-8"));
+      bos.write(CRLF_BYTES);
+      // the http headers
+      bos.write(httpheaders);
+
+      // the binary content itself
+      if (content.getContent() != null) {
+        bos.write(content.getContent());
+      }
+      bos.write(CRLF_BYTES);
+      bos.write(CRLF_BYTES);
+
+      try {
+        DataInput in = new DataInputStream(
+            new ByteArrayInputStream(bos.toByteArray()));
+        WARCRecord record = new WARCRecord(in);
+        output.collect(NullWritable.get(), new WARCWritable(record));
+        reporter.getCounter("WARCExporter", "records generated").increment(1);
+      } catch (IOException exception) {
+        LOG.error("Exception when generating WARC record for {} : {}", key,
+            exception.getMessage());
+        reporter.getCounter("WARCExporter", "exception").increment(1);
+      }
+
+    }
+  }
+
+  public int generateWARC(String output, List<Path> segments) {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("WARCExporter: starting at {}", sdf.format(start));
+
+    final JobConf job = new NutchJob(getConf());
+    job.setJobName("warc-exporter " + output);
+
+    for (final Path segment : segments) {
+      LOG.info("warc-exporter: adding segment: {}", segment);
+      FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+      FileInputFormat.addInputPath(job,
+          new Path(segment, CrawlDatum.FETCH_DIR_NAME));
+    }
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+
+    job.setMapperClass(WARCReducer.class);
+    job.setReducerClass(WARCReducer.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(NutchWritable.class);
+
+    FileOutputFormat.setOutputPath(job, new Path(output));
+    // using the old api
+    job.setOutputFormat(WARCOutputFormat.class);
+
+    job.setOutputKeyClass(NullWritable.class);
+    job.setOutputValueClass(WARCWritable.class);
+
+    try {
+      RunningJob rj = JobClient.runJob(job);
+      LOG.info(rj.getCounters().toString());
+      long end = System.currentTimeMillis();
+      LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end),
+          TimingUtil.elapsedTime(start, end));
+    } catch (Exception e) {
+      LOG.error("Exception caught", e);
+      return -1;
+    }
+
+    return 0;
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err.println(
+          "Usage: WARCExporter <output> (<segment> ... | -dir <segments>)");
+      return -1;
+    }
+
+    final List<Path> segments = new ArrayList<Path>();
+
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-dir")) {
+        Path dir = new Path(args[++i]);
+        FileSystem fs = dir.getFileSystem(getConf());
+        FileStatus[] fstats = fs.listStatus(dir,
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
+        Path[] files = HadoopFSUtil.getPaths(fstats);
+        for (Path p : files) {
+          segments.add(p);
+        }
+      } else {
+        segments.add(new Path(args[i]));
+      }
+    }
+
+    return generateWARC(args[0], segments);
+  }
+
+  public static void main(String[] args) throws Exception {
+    final int res = ToolRunner.run(NutchConfiguration.create(),
+        new WARCExporter(), args);
+    System.exit(res);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java
new file mode 100644
index 0000000..44e1a94
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Tools to import / export between Nutch segments and
+ * <a href="http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf">
+ * WARC archives</a>.
+ */
+package org.apache.nutch.tools.warc;

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java b/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java
new file mode 100644
index 0000000..593d590
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Adopted by John Xing for Nutch Project from
+ * http://blog.fivesight.com/prb/space/Call+an+External+Command+from+Java/,
+ * which explains the code in detail.
+ * [Original author is moving his site to http://mult.ifario.us/   -peb]
+ *
+ * Comments by John Xing on 20040621:
+ * (1) EDU.oswego.cs.dl.util.concurrent.* is in j2sdk 1.5 now.
+ *     Modifications are needed if we move to j2sdk 1.5.
+ * (2) The original looks good, not much to change.
+ *
+ * This code is in the public domain and comes with no warranty.  
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class CommandRunner {
+
+  private boolean _waitForExit = true;
+  private String _command;
+  private int _timeout = 10;
+
+  private InputStream _stdin;
+  private OutputStream _stdout;
+  private OutputStream _stderr;
+
+  private static final int BUF = 4096;
+
+  private int _xit;
+
+  private Throwable _thrownError;
+
+  private CyclicBarrier _barrier;
+
+  public int getExitValue() {
+    return _xit;
+  }
+
+  public void setCommand(String s) {
+    _command = s;
+  }
+
+  public String getCommand() {
+    return _command;
+  }
+
+  public void setInputStream(InputStream is) {
+    _stdin = is;
+  }
+
+  public void setStdOutputStream(OutputStream os) {
+    _stdout = os;
+  }
+
+  public void setStdErrorStream(OutputStream os) {
+    _stderr = os;
+  }
+
+  public void evaluate() throws IOException {
+    this.exec();
+  }
+
+  /**
+   * 
+   * @return process exit value (return code) or -1 if timed out.
+   * @throws IOException
+   */
+  public int exec() throws IOException {
+    Process proc = Runtime.getRuntime().exec(_command);
+    _barrier = new CyclicBarrier(3 + ((_stdin != null) ? 1 : 0));
+
+    PullerThread so = new PullerThread("STDOUT", proc.getInputStream(), _stdout);
+    so.setDaemon(true);
+    so.start();
+
+    PullerThread se = new PullerThread("STDERR", proc.getErrorStream(), _stderr);
+    se.setDaemon(true);
+    se.start();
+
+    PusherThread si = null;
+    if (_stdin != null) {
+      si = new PusherThread("STDIN", _stdin, proc.getOutputStream());
+      si.setDaemon(true);
+      si.start();
+    }
+
+    boolean _timedout = false;
+    long end = System.currentTimeMillis() + _timeout * 1000;
+
+    //
+    try {
+      if (_timeout == 0) {
+        _barrier.await();
+      } else {
+        _barrier.await(_timeout, TimeUnit.SECONDS);
+      }
+    } catch (TimeoutException ex) {
+      _timedout = true;
+    } catch (BrokenBarrierException bbe) {
+      /* IGNORE */
+    } catch (InterruptedException e) {
+      /* IGNORE */
+    }
+
+    // tell the io threads we are finished
+    if (si != null) {
+      si.interrupt();
+    }
+    so.interrupt();
+    se.interrupt();
+
+    _xit = -1;
+
+    if (!_timedout) {
+      if (_waitForExit) {
+        do {
+          try {
+            Thread.sleep(1000);
+            _xit = proc.exitValue();
+          } catch (InterruptedException ie) {
+            if (Thread.interrupted()) {
+              break; // stop waiting on an interrupt for this thread
+            } else {
+              continue;
+            }
+          } catch (IllegalThreadStateException iltse) {
+            continue;
+          }
+          break;
+        } while (!(_timedout = (System.currentTimeMillis() > end)));
+      } else {
+        try {
+          _xit = proc.exitValue();
+        } catch (IllegalThreadStateException iltse) {
+          _timedout = true;
+        }
+      }
+    }
+
+    if (_waitForExit) {
+      proc.destroy();
+    }
+    return _xit;
+  }
+
+  public Throwable getThrownError() {
+    return _thrownError;
+  }
+
+  private class PumperThread extends Thread {
+
+    private OutputStream _os;
+    private InputStream _is;
+
+    private boolean _closeInput;
+
+    protected PumperThread(String name, InputStream is, OutputStream os,
+        boolean closeInput) {
+      super(name);
+      _is = is;
+      _os = os;
+      _closeInput = closeInput;
+    }
+
+    public void run() {
+      try {
+        byte[] buf = new byte[BUF];
+        int read = 0;
+        while (!isInterrupted() && (read = _is.read(buf)) != -1) {
+          if (read == 0)
+            continue;
+          _os.write(buf, 0, read);
+          _os.flush();
+        }
+      } catch (InterruptedIOException iioe) {
+        // ignored
+      } catch (Throwable t) {
+        _thrownError = t;
+      } finally {
+        try {
+          if (_closeInput) {
+            _is.close();
+          } else {
+            _os.close();
+          }
+        } catch (IOException ioe) {
+          /* IGNORE */
+        }
+      }
+      try {
+        _barrier.await();
+      } catch (InterruptedException ie) {
+        /* IGNORE */
+      } catch (BrokenBarrierException bbe) {
+        /* IGNORE */
+      }
+    }
+  }
+
+  private class PusherThread extends PumperThread {
+    PusherThread(String name, InputStream is, OutputStream os) {
+      super(name, is, os, false);
+    }
+  }
+
+  private class PullerThread extends PumperThread {
+    PullerThread(String name, InputStream is, OutputStream os) {
+      super(name, is, os, true);
+    }
+  }
+
+  public int getTimeout() {
+    return _timeout;
+  }
+
+  public void setTimeout(int timeout) {
+    _timeout = timeout;
+  }
+
+  public boolean getWaitForExit() {
+    return _waitForExit;
+  }
+
+  public void setWaitForExit(boolean waitForExit) {
+    _waitForExit = waitForExit;
+  }
+
+  public static void main(String[] args) throws Exception {
+    String commandPath = null;
+    String filePath = null;
+    int timeout = 10;
+
+    String usage = "Usage: CommandRunner [-timeout timeoutSecs] commandPath filePath";
+
+    if (args.length < 2) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+
+    for (int i = 0; i < args.length; i++) {
+      if (args[i].equals("-timeout")) {
+        timeout = Integer.parseInt(args[++i]);
+        ;
+      } else if (i != args.length - 2) {
+        System.err.println(usage);
+        System.exit(-1);
+      } else {
+        commandPath = args[i];
+        filePath = args[++i];
+      }
+    }
+
+    CommandRunner cr = new CommandRunner();
+
+    cr.setCommand(commandPath);
+    cr.setInputStream(new java.io.FileInputStream(filePath));
+    cr.setStdErrorStream(System.err);
+    cr.setStdOutputStream(System.out);
+
+    cr.setTimeout(timeout);
+
+    cr.evaluate();
+
+    System.err.println("output value: " + cr.getExitValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java b/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java
new file mode 100644
index 0000000..8aafe59
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.MissingOptionException;
+
+/**
+ * Extracts some simple crawl completion stats from the crawldb
+ *
+ * Stats will be sorted by host/domain and will be of the form:
+ * 1	www.spitzer.caltech.edu FETCHED
+ * 50	www.spitzer.caltech.edu UNFETCHED
+ *
+ */
+public class CrawlCompletionStats extends Configured implements Tool {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(CrawlCompletionStats.class);
+
+  private static final int MODE_HOST = 1;
+  private static final int MODE_DOMAIN = 2;
+
+  private int mode = 0;
+
+  public int run(String[] args) throws Exception {
+    Option helpOpt = new Option("h", "help", false, "Show this message");
+    Option inDirs = OptionBuilder
+        .withArgName("inputDirs")
+        .isRequired()
+        .withDescription("Comma separated list of crawl directories (e.g., \"./crawl1,./crawl2\")")
+        .hasArgs()
+        .create("inputDirs");
+    Option outDir = OptionBuilder
+        .withArgName("outputDir")
+        .isRequired()
+        .withDescription("Output directory where results should be dumped")
+        .hasArgs()
+        .create("outputDir");
+    Option modeOpt = OptionBuilder
+        .withArgName("mode")
+        .isRequired()
+        .withDescription("Set statistics gathering mode (by 'host' or by 'domain')")
+        .hasArgs()
+        .create("mode");
+    Option numReducers = OptionBuilder
+        .withArgName("numReducers")
+        .withDescription("Optional number of reduce jobs to use. Defaults to 1")
+        .hasArgs()
+        .create("numReducers");
+
+    Options options = new Options();
+    options.addOption(helpOpt);
+    options.addOption(inDirs);
+    options.addOption(outDir);
+    options.addOption(modeOpt);
+    options.addOption(numReducers);
+
+    CommandLineParser parser = new GnuParser();
+    CommandLine cli;
+
+    try {
+      cli = parser.parse(options, args);
+    } catch (MissingOptionException e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("CrawlCompletionStats", options, true);
+      return 1;
+    }
+
+    if (cli.hasOption("help")) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("CrawlCompletionStats", options, true);
+      return 1;
+    }
+
+    String inputDir = cli.getOptionValue("inputDirs");
+    String outputDir = cli.getOptionValue("outputDir");
+
+    int numOfReducers = 1;
+    if (cli.hasOption("numReducers")) {
+      numOfReducers = Integer.parseInt(args[3]);
+    }
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("CrawlCompletionStats: starting at {}", sdf.format(start));
+
+    int mode = 0;
+    String jobName = "CrawlCompletionStats";
+    if (cli.getOptionValue("mode").equals("host")) {
+      jobName = "Host CrawlCompletionStats";
+      mode = MODE_HOST;
+    } else if (cli.getOptionValue("mode").equals("domain")) {
+      jobName = "Domain CrawlCompletionStats";
+      mode = MODE_DOMAIN;
+    } 
+
+    Configuration conf = getConf();
+    conf.setInt("domain.statistics.mode", mode);
+    conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+    Job job = Job.getInstance(conf, jobName);
+    job.setJarByClass(CrawlCompletionStats.class);
+
+    String[] inputDirsSpecs = inputDir.split(",");
+    for (int i = 0; i < inputDirsSpecs.length; i++) {
+      File completeInputPath = new File(new File(inputDirsSpecs[i]), "crawldb/current");
+      FileInputFormat.addInputPath(job, new Path(completeInputPath.toString()));
+      
+    }
+
+    job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileOutputFormat.setOutputPath(job, new Path(outputDir));
+    job.setOutputFormatClass(TextOutputFormat.class);
+
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(LongWritable.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(LongWritable.class);
+
+    job.setMapperClass(CrawlCompletionStatsMapper.class);
+    job.setReducerClass(CrawlCompletionStatsReducer.class);
+    job.setCombinerClass(CrawlCompletionStatsCombiner.class);
+    job.setNumReduceTasks(numOfReducers);
+
+    try {
+      job.waitForCompletion(true);
+    } catch (Exception e) {
+      throw e;
+    }
+
+    long end = System.currentTimeMillis();
+    LOG.info("CrawlCompletionStats: finished at {}, elapsed: {}",
+      sdf.format(end), TimingUtil.elapsedTime(start, end));
+    return 0;
+  }
+
+  static class CrawlCompletionStatsMapper extends
+      Mapper<Text, CrawlDatum, Text, LongWritable> {
+    int mode = 0;
+
+    public void setup(Context context) {
+      mode = context.getConfiguration().getInt("domain.statistics.mode", MODE_DOMAIN);
+    }
+
+    public void map(Text urlText, CrawlDatum datum, Context context)
+        throws IOException, InterruptedException {
+
+      URL url = new URL(urlText.toString());
+      String out = "";
+      switch (mode) {
+        case MODE_HOST:
+          out = url.getHost();
+          break;
+        case MODE_DOMAIN:
+          out = URLUtil.getDomainName(url);
+          break;
+      }
+
+      if (datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED
+          || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+        context.write(new Text(out + " FETCHED"), new LongWritable(1));
+      } else {
+        context.write(new Text(out + " UNFETCHED"), new LongWritable(1));
+      }
+    }
+  }
+
+  static class CrawlCompletionStatsReducer extends
+      Reducer<Text, LongWritable, LongWritable, Text> {
+    public void reduce(Text key, Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
+      long total = 0;
+
+      for (LongWritable val : values) {
+        total += val.get();
+      }
+
+      context.write(new LongWritable(total), key);
+    }
+  }
+
+  public static class CrawlCompletionStatsCombiner extends
+      Reducer<Text, LongWritable, Text, LongWritable> {
+    public void reduce(Text key, Iterable<LongWritable> values, Context context)
+        throws IOException, InterruptedException {
+      long total = 0;
+
+      for (LongWritable val : values) {
+        total += val.get();
+      }
+      context.write(key, new LongWritable(total));
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(NutchConfiguration.create(), new CrawlCompletionStats(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java
new file mode 100644
index 0000000..5863522
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import java.util.zip.DeflaterOutputStream;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of utility methods for working on deflated data.
+ */
+public class DeflateUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DeflateUtils.class);
+  private static final int EXPECTED_COMPRESSION_RATIO = 5;
+  private static final int BUF_SIZE = 4096;
+
+  /**
+   * Returns an inflated copy of the input array. If the deflated input has been
+   * truncated or corrupted, a best-effort attempt is made to inflate as much as
+   * possible. If no data can be extracted <code>null</code> is returned.
+   */
+  public static final byte[] inflateBestEffort(byte[] in) {
+    return inflateBestEffort(in, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Returns an inflated copy of the input array, truncated to
+   * <code>sizeLimit</code> bytes, if necessary. If the deflated input has been
+   * truncated or corrupted, a best-effort attempt is made to inflate as much as
+   * possible. If no data can be extracted <code>null</code> is returned.
+   */
+  public static final byte[] inflateBestEffort(byte[] in, int sizeLimit) {
+    // decompress using InflaterInputStream
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+        EXPECTED_COMPRESSION_RATIO * in.length);
+
+    // "true" because HTTP does not provide zlib headers
+    Inflater inflater = new Inflater(true);
+    InflaterInputStream inStream = new InflaterInputStream(
+        new ByteArrayInputStream(in), inflater);
+
+    byte[] buf = new byte[BUF_SIZE];
+    int written = 0;
+    while (true) {
+      try {
+        int size = inStream.read(buf);
+        if (size <= 0)
+          break;
+        if ((written + size) > sizeLimit) {
+          outStream.write(buf, 0, sizeLimit - written);
+          break;
+        }
+        outStream.write(buf, 0, size);
+        written += size;
+      } catch (Exception e) {
+        LOG.info("Caught Exception in inflateBestEffort", e);
+        break;
+      }
+    }
+    try {
+      outStream.close();
+    } catch (IOException e) {
+    }
+
+    return outStream.toByteArray();
+  }
+
+  /**
+   * Returns an inflated copy of the input array.
+   * 
+   * @throws IOException
+   *           if the input cannot be properly decompressed
+   */
+  public static final byte[] inflate(byte[] in) throws IOException {
+    // decompress using InflaterInputStream
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+        EXPECTED_COMPRESSION_RATIO * in.length);
+
+    InflaterInputStream inStream = new InflaterInputStream(
+        new ByteArrayInputStream(in));
+
+    byte[] buf = new byte[BUF_SIZE];
+    while (true) {
+      int size = inStream.read(buf);
+      if (size <= 0)
+        break;
+      outStream.write(buf, 0, size);
+    }
+    outStream.close();
+
+    return outStream.toByteArray();
+  }
+
+  /**
+   * Returns a deflated copy of the input array.
+   */
+  public static final byte[] deflate(byte[] in) {
+    // compress using DeflaterOutputStream
+    ByteArrayOutputStream byteOut = new ByteArrayOutputStream(in.length
+        / EXPECTED_COMPRESSION_RATIO);
+
+    DeflaterOutputStream outStream = new DeflaterOutputStream(byteOut);
+
+    try {
+      outStream.write(in);
+    } catch (Exception e) {
+      LOG.error("Error compressing: ", e);
+    }
+
+    try {
+      outStream.close();
+    } catch (IOException e) {
+      LOG.error("Error closing: ", e);
+    }
+
+    return byteOut.toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java
new file mode 100644
index 0000000..9595bf4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.xerces.parsers.DOMParser;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DomUtil {
+
+  private final static Logger LOG = LoggerFactory.getLogger(DomUtil.class);
+
+  /**
+   * Returns parsed dom tree or null if any error
+   * 
+   * @param is
+   * @return A parsed DOM tree from the given {@link InputStream}.
+   */
+  public static Element getDom(InputStream is) {
+
+    Element element = null;
+
+    DOMParser parser = new DOMParser();
+
+    InputSource input;
+    try {
+      input = new InputSource(is);
+      input.setEncoding("UTF-8");
+      parser.parse(input);
+      int i = 0;
+      while (!(parser.getDocument().getChildNodes().item(i) instanceof Element)) {
+        i++;
+      }
+      element = (Element) parser.getDocument().getChildNodes().item(i);
+    } catch (FileNotFoundException e) {
+      LOG.error("Error: ", e);
+    } catch (SAXException e) {
+      LOG.error("Error: ", e);
+    } catch (IOException e) {
+      LOG.error("Error: ", e);
+    }
+    return element;
+  }
+
+  /**
+   * save dom into ouputstream
+   * 
+   * @param os
+   * @param e
+   */
+  public static void saveDom(OutputStream os, Element e) {
+
+    DOMSource source = new DOMSource(e);
+    TransformerFactory transFactory = TransformerFactory.newInstance();
+    Transformer transformer;
+    try {
+      transformer = transFactory.newTransformer();
+      transformer.setOutputProperty("indent", "yes");
+      StreamResult result = new StreamResult(os);
+      transformer.transform(source, result);
+      os.flush();
+    } catch (UnsupportedEncodingException e1) {
+      LOG.error("Error: ", e1);
+    } catch (IOException e1) {
+      LOG.error("Error: ", e1);
+    } catch (TransformerConfigurationException e2) {
+      LOG.error("Error: ", e2);
+    } catch (TransformerException ex) {
+      LOG.error("Error: ", ex);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java
new file mode 100644
index 0000000..9ed3e75
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class DumpFileUtil {
+    private static final Logger LOG = LoggerFactory.getLogger(DumpFileUtil.class
+            .getName());
+
+    private final static String DIR_PATTERN = "%s/%s/%s";
+    private final static String FILENAME_PATTERN = "%s_%s.%s";
+    private final static Integer MAX_LENGTH_OF_FILENAME = 32;
+    private final static Integer MAX_LENGTH_OF_EXTENSION = 5; 
+   
+    public static String getUrlMD5(String url) {
+        byte[] digest = MD5Hash.digest(url).getDigest();
+
+        StringBuffer sb = new StringBuffer();
+        for (byte b : digest) {
+            sb.append(String.format("%02x", b & 0xff));
+        }
+
+        return sb.toString();
+    }
+
+    public static String createTwoLevelsDirectory(String basePath, String md5, boolean makeDir) {
+        String firstLevelDirName = new StringBuilder().append(md5.charAt(0)).append(md5.charAt(8)).toString();
+        String secondLevelDirName = new StringBuilder().append(md5.charAt(16)).append(md5.charAt(24)).toString();
+
+        String fullDirPath = String.format(DIR_PATTERN, basePath, firstLevelDirName, secondLevelDirName);
+
+        if (makeDir) {
+	        try {
+	            FileUtils.forceMkdir(new File(fullDirPath));
+	        } catch (IOException e) {
+	            LOG.error("Failed to create dir: {}", fullDirPath);
+	            fullDirPath = null;
+	        }
+        }
+
+        return fullDirPath;
+    }
+    
+    public static String createTwoLevelsDirectory(String basePath, String md5) {
+        return createTwoLevelsDirectory(basePath, md5, true);
+    }
+
+    public static String createFileName(String md5, String fileBaseName, String fileExtension) {
+        if (fileBaseName.length() > MAX_LENGTH_OF_FILENAME) {
+            LOG.info("File name is too long. Truncated to {} characters.", MAX_LENGTH_OF_FILENAME);
+            fileBaseName = StringUtils.substring(fileBaseName, 0, MAX_LENGTH_OF_FILENAME);
+        } 
+        
+        if (fileExtension.length() > MAX_LENGTH_OF_EXTENSION) {
+            LOG.info("File extension is too long. Truncated to {} characters.", MAX_LENGTH_OF_EXTENSION);
+            fileExtension = StringUtils.substring(fileExtension, 0, MAX_LENGTH_OF_EXTENSION);
+        }
+	
+	// Added to prevent FileNotFoundException (Invalid Argument) - in *nix environment
+        fileBaseName = fileBaseName.replaceAll("\\?", "");
+        fileExtension = fileExtension.replaceAll("\\?", "");
+
+        return String.format(FILENAME_PATTERN, md5, fileBaseName, fileExtension);
+    }
+    
+    public static String createFileNameFromUrl(String basePath, String reverseKey, String urlString, String epochScrapeTime, String fileExtension, boolean makeDir) {
+		String fullDirPath = basePath + File.separator + reverseKey + File.separator + DigestUtils.sha1Hex(urlString);
+		
+		if (makeDir) {
+	        try {
+	            FileUtils.forceMkdir(new File(fullDirPath));
+	        } catch (IOException e) {
+	            LOG.error("Failed to create dir: {}", fullDirPath);
+	            fullDirPath = null;
+	        }
+        }
+		
+		if (fileExtension.length() > MAX_LENGTH_OF_EXTENSION) {
+			LOG.info("File extension is too long. Truncated to {} characters.", MAX_LENGTH_OF_EXTENSION);
+			fileExtension = StringUtils.substring(fileExtension, 0, MAX_LENGTH_OF_EXTENSION);
+	    }
+		
+		String outputFullPath = fullDirPath + File.separator + epochScrapeTime + "." + fileExtension;
+		
+		return outputFullPath;
+    }
+    
+	public static String displayFileTypes(Map<String, Integer> typeCounts, Map<String, Integer> filteredCounts) {
+		StringBuilder builder = new StringBuilder();
+		// print total stats
+		builder.append("\nTOTAL Stats:\n");
+		builder.append("[\n");
+		int mimetypeCount = 0;
+		for (String mimeType : typeCounts.keySet()) {
+			builder.append("    {\"mimeType\":\"");
+			builder.append(mimeType);
+			builder.append("\",\"count\":\"");
+			builder.append(typeCounts.get(mimeType));
+			builder.append("\"}\n");
+			mimetypeCount += typeCounts.get(mimeType);
+		}
+		builder.append("]\n");
+		builder.append("Total count: " + mimetypeCount + "\n");
+		// filtered types stats
+		mimetypeCount = 0;
+		if (!filteredCounts.isEmpty()) {
+			builder.append("\nFILTERED Stats:\n");
+			builder.append("[\n");
+			for (String mimeType : filteredCounts.keySet()) {
+				builder.append("    {\"mimeType\":\"");
+				builder.append(mimeType);
+				builder.append("\",\"count\":\"");
+				builder.append(filteredCounts.get(mimeType));
+				builder.append("\"}\n");
+				mimetypeCount += filteredCounts.get(mimeType);
+			}
+			builder.append("]\n");
+			builder.append("Total filtered count: " + mimetypeCount + "\n");
+		}
+		return builder.toString();
+	}  
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java b/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java
new file mode 100644
index 0000000..4e62dd3
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.net.protocols.Response;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.NutchConfiguration;
+
+import com.ibm.icu.text.CharsetDetector;
+import com.ibm.icu.text.CharsetMatch;
+
+/**
+ * A simple class for detecting character encodings.
+ * 
+ * <p>
+ * Broadly this encompasses two functions, which are distinctly separate:
+ * 
+ * <ol>
+ * <li>Auto detecting a set of "clues" from input text.</li>
+ * <li>Taking a set of clues and making a "best guess" as to the "real"
+ * encoding.</li>
+ * </ol>
+ * </p>
+ * 
+ * <p>
+ * A caller will often have some extra information about what the encoding might
+ * be (e.g. from the HTTP header or HTML meta-tags, often wrong but still
+ * potentially useful clues). The types of clues may differ from caller to
+ * caller. Thus a typical calling sequence is:
+ * <ul>
+ * <li>Run step (1) to generate a set of auto-detected clues;</li>
+ * <li>Combine these clues with the caller-dependent "extra clues" available;</li>
+ * <li>Run step (2) to guess what the most probable answer is.</li>
+ * </p>
+ */
+public class EncodingDetector {
+
+  private class EncodingClue {
+    private String value;
+    private String source;
+    private int confidence;
+
+    // Constructor for clues with no confidence values (ignore thresholds)
+    public EncodingClue(String value, String source) {
+      this(value, source, NO_THRESHOLD);
+    }
+
+    public EncodingClue(String value, String source, int confidence) {
+      this.value = value.toLowerCase();
+      this.source = source;
+      this.confidence = confidence;
+    }
+
+    public String getSource() {
+      return source;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public String toString() {
+      return value + " (" + source
+          + ((confidence >= 0) ? ", " + confidence + "% confidence" : "") + ")";
+    }
+
+    public boolean isEmpty() {
+      return (value == null || "".equals(value));
+    }
+
+    public boolean meetsThreshold() {
+      return (confidence < 0 || (minConfidence >= 0 && confidence >= minConfidence));
+    }
+  }
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(EncodingDetector.class);
+
+  public static final int NO_THRESHOLD = -1;
+
+  public static final String MIN_CONFIDENCE_KEY = "encodingdetector.charset.min.confidence";
+
+  private static final HashMap<String, String> ALIASES = new HashMap<String, String>();
+
+  private static final HashSet<String> DETECTABLES = new HashSet<String>();
+
+  // CharsetDetector will die without a minimum amount of data.
+  private static final int MIN_LENGTH = 4;
+
+  static {
+    DETECTABLES.add("text/html");
+    DETECTABLES.add("text/plain");
+    DETECTABLES.add("text/richtext");
+    DETECTABLES.add("text/rtf");
+    DETECTABLES.add("text/sgml");
+    DETECTABLES.add("text/tab-separated-values");
+    DETECTABLES.add("text/xml");
+    DETECTABLES.add("application/rss+xml");
+    DETECTABLES.add("application/xhtml+xml");
+    /*
+     * the following map is not an alias mapping table, but maps character
+     * encodings which are often used in mislabelled documents to their correct
+     * encodings. For instance, there are a lot of documents labelled
+     * 'ISO-8859-1' which contain characters not covered by ISO-8859-1 but
+     * covered by windows-1252. Because windows-1252 is a superset of ISO-8859-1
+     * (sharing code points for the common part), it's better to treat
+     * ISO-8859-1 as synonymous with windows-1252 than to reject, as invalid,
+     * documents labelled as ISO-8859-1 that have characters outside ISO-8859-1.
+     */
+    ALIASES.put("ISO-8859-1", "windows-1252");
+    ALIASES.put("EUC-KR", "x-windows-949");
+    ALIASES.put("x-EUC-CN", "GB18030");
+    ALIASES.put("GBK", "GB18030");
+    // ALIASES.put("Big5", "Big5HKSCS");
+    // ALIASES.put("TIS620", "Cp874");
+    // ALIASES.put("ISO-8859-11", "Cp874");
+
+  }
+
+  private int minConfidence;
+
+  private CharsetDetector detector;
+
+  private List<EncodingClue> clues;
+
+  public EncodingDetector(Configuration conf) {
+    minConfidence = conf.getInt(MIN_CONFIDENCE_KEY, -1);
+    detector = new CharsetDetector();
+    clues = new ArrayList<EncodingClue>();
+  }
+
+  public void autoDetectClues(Content content, boolean filter) {
+    byte[] data = content.getContent();
+
+    if (minConfidence >= 0 && DETECTABLES.contains(content.getContentType())
+        && data.length > MIN_LENGTH) {
+      CharsetMatch[] matches = null;
+
+      // do all these in a try/catch; setText and detect/detectAll
+      // will sometimes throw exceptions
+      try {
+        detector.enableInputFilter(filter);
+        if (data.length > MIN_LENGTH) {
+          detector.setText(data);
+          matches = detector.detectAll();
+        }
+      } catch (Exception e) {
+        LOG.debug("Exception from ICU4J (ignoring): ", e);
+      }
+
+      if (matches != null) {
+        for (CharsetMatch match : matches) {
+          addClue(match.getName(), "detect", match.getConfidence());
+        }
+      }
+    }
+
+    // add character encoding coming from HTTP response header
+    addClue(
+        parseCharacterEncoding(content.getMetadata().get(Response.CONTENT_TYPE)),
+        "header");
+  }
+
+  public void addClue(String value, String source, int confidence) {
+    if (value == null || "".equals(value)) {
+      return;
+    }
+    value = resolveEncodingAlias(value);
+    if (value != null) {
+      clues.add(new EncodingClue(value, source, confidence));
+    }
+  }
+
+  public void addClue(String value, String source) {
+    addClue(value, source, NO_THRESHOLD);
+  }
+
+  /**
+   * Guess the encoding with the previously specified list of clues.
+   * 
+   * @param content
+   *          Content instance
+   * @param defaultValue
+   *          Default encoding to return if no encoding can be detected with
+   *          enough confidence. Note that this will <b>not</b> be normalized
+   *          with {@link EncodingDetector#resolveEncodingAlias}
+   * 
+   * @return Guessed encoding or defaultValue
+   */
+  public String guessEncoding(Content content, String defaultValue) {
+    /*
+     * This algorithm could be replaced by something more sophisticated; ideally
+     * we would gather a bunch of data on where various clues (autodetect, HTTP
+     * headers, HTML meta tags, etc.) disagree, tag each with the correct
+     * answer, and use machine learning/some statistical method to generate a
+     * better heuristic.
+     */
+
+    String base = content.getBaseUrl();
+
+    if (LOG.isTraceEnabled()) {
+      findDisagreements(base, clues);
+    }
+
+    /*
+     * Go down the list of encoding "clues". Use a clue if: 1. Has a confidence
+     * value which meets our confidence threshold, OR 2. Doesn't meet the
+     * threshold, but is the best try, since nothing else is available.
+     */
+    EncodingClue defaultClue = new EncodingClue(defaultValue, "default");
+    EncodingClue bestClue = defaultClue;
+
+    for (EncodingClue clue : clues) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(base + ": charset " + clue);
+      }
+      String charset = clue.value;
+      if (minConfidence >= 0 && clue.confidence >= minConfidence) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(base + ": Choosing encoding: " + charset
+              + " with confidence " + clue.confidence);
+        }
+        return resolveEncodingAlias(charset).toLowerCase();
+      } else if (clue.confidence == NO_THRESHOLD && bestClue == defaultClue) {
+        bestClue = clue;
+      }
+    }
+
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(base + ": Choosing encoding: " + bestClue);
+    }
+    return bestClue.value.toLowerCase();
+  }
+
+  /** Clears all clues. */
+  public void clearClues() {
+    clues.clear();
+  }
+
+  /*
+   * Strictly for analysis, look for "disagreements." The top guess from each
+   * source is examined; if these meet the threshold and disagree, then we log
+   * the information -- useful for testing or generating training data for a
+   * better heuristic.
+   */
+  private void findDisagreements(String url, List<EncodingClue> newClues) {
+    HashSet<String> valsSeen = new HashSet<String>();
+    HashSet<String> sourcesSeen = new HashSet<String>();
+    boolean disagreement = false;
+    for (int i = 0; i < newClues.size(); i++) {
+      EncodingClue clue = newClues.get(i);
+      if (!clue.isEmpty() && !sourcesSeen.contains(clue.source)) {
+        if (valsSeen.size() > 0 && !valsSeen.contains(clue.value)
+            && clue.meetsThreshold()) {
+          disagreement = true;
+        }
+        if (clue.meetsThreshold()) {
+          valsSeen.add(clue.value);
+        }
+        sourcesSeen.add(clue.source);
+      }
+    }
+    if (disagreement) {
+      // dump all values in case of disagreement
+      StringBuffer sb = new StringBuffer();
+      sb.append("Disagreement: " + url + "; ");
+      for (int i = 0; i < newClues.size(); i++) {
+        if (i > 0) {
+          sb.append(", ");
+        }
+        sb.append(newClues.get(i));
+      }
+      LOG.trace(sb.toString());
+    }
+  }
+
+  public static String resolveEncodingAlias(String encoding) {
+    try {
+      if (encoding == null || !Charset.isSupported(encoding))
+        return null;
+      String canonicalName = new String(Charset.forName(encoding).name());
+      return ALIASES.containsKey(canonicalName) ? ALIASES.get(canonicalName)
+          : canonicalName;
+    } catch (Exception e) {
+      LOG.warn("Invalid encoding " + encoding + " detected, using default.");
+      return null;
+    }
+  }
+
+  /**
+   * Parse the character encoding from the specified content type header. If the
+   * content type is null, or there is no explicit character encoding,
+   * <code>null</code> is returned. <br />
+   * This method was copied from org.apache.catalina.util.RequestUtil, which is
+   * licensed under the Apache License, Version 2.0 (the "License").
+   * 
+   * @param contentType
+   *          a content type header
+   */
+  public static String parseCharacterEncoding(String contentType) {
+    if (contentType == null)
+      return (null);
+    int start = contentType.indexOf("charset=");
+    if (start < 0)
+      return (null);
+    String encoding = contentType.substring(start + 8);
+    int end = encoding.indexOf(';');
+    if (end >= 0)
+      encoding = encoding.substring(0, end);
+    encoding = encoding.trim();
+    if ((encoding.length() > 2) && (encoding.startsWith("\""))
+        && (encoding.endsWith("\"")))
+      encoding = encoding.substring(1, encoding.length() - 1);
+    return (encoding.trim());
+
+  }
+
+  public static void main(String[] args) throws IOException {
+    if (args.length != 1) {
+      System.err.println("Usage: EncodingDetector <file>");
+      System.exit(1);
+    }
+
+    Configuration conf = NutchConfiguration.create();
+    EncodingDetector detector = new EncodingDetector(
+        NutchConfiguration.create());
+
+    // do everything as bytes; don't want any conversion
+    BufferedInputStream istr = new BufferedInputStream(new FileInputStream(
+        args[0]));
+    ByteArrayOutputStream ostr = new ByteArrayOutputStream();
+    byte[] bytes = new byte[1000];
+    boolean more = true;
+    while (more) {
+      int len = istr.read(bytes);
+      if (len < bytes.length) {
+        more = false;
+        if (len > 0) {
+          ostr.write(bytes, 0, len);
+        }
+      } else {
+        ostr.write(bytes);
+      }
+    }
+
+    byte[] data = ostr.toByteArray();
+
+    // make a fake Content
+    Content content = new Content("", "", data, "text/html", new Metadata(),
+        conf);
+
+    detector.autoDetectClues(content, true);
+    String encoding = detector.guessEncoding(content,
+        conf.get("parser.character.encoding.default"));
+    System.out.println("Guessed encoding: " + encoding);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java
new file mode 100644
index 0000000..6aed8d5
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Utility methods for common filesystem operations.
+ */
+public class FSUtils {
+
+  /**
+   * Replaces the current path with the new path and if set removes the old
+   * path. If removeOld is set to false then the old path will be set to the
+   * name current.old.
+   * 
+   * @param fs
+   *          The FileSystem.
+   * @param current
+   *          The end path, the one being replaced.
+   * @param replacement
+   *          The path to replace with.
+   * @param removeOld
+   *          True if we are removing the current path.
+   * 
+   * @throws IOException
+   *           If an error occurs during replacement.
+   */
+  public static void replace(FileSystem fs, Path current, Path replacement,
+      boolean removeOld) throws IOException {
+
+    // rename any current path to old
+    Path old = new Path(current + ".old");
+    if (fs.exists(current)) {
+      fs.rename(current, old);
+    }
+
+    // rename the new path to current and remove the old path if needed
+    fs.rename(replacement, current);
+    if (fs.exists(old) && removeOld) {
+      fs.delete(old, true);
+    }
+  }
+
+  /**
+   * Closes a group of SequenceFile readers.
+   * 
+   * @param readers
+   *          The SequenceFile readers to close.
+   * @throws IOException
+   *           If an error occurs while closing a reader.
+   */
+  public static void closeReaders(SequenceFile.Reader[] readers)
+      throws IOException {
+
+    // loop through the readers, closing one by one
+    if (readers != null) {
+      for (int i = 0; i < readers.length; i++) {
+        SequenceFile.Reader reader = readers[i];
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+  }
+
+  /**
+   * Closes a group of MapFile readers.
+   * 
+   * @param readers
+   *          The MapFile readers to close.
+   * @throws IOException
+   *           If an error occurs while closing a reader.
+   */
+  public static void closeReaders(MapFile.Reader[] readers) throws IOException {
+
+    // loop through the readers closing one by one
+    if (readers != null) {
+      for (int i = 0; i < readers.length; i++) {
+        MapFile.Reader reader = readers[i];
+        if (reader != null) {
+          reader.close();
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java
new file mode 100644
index 0000000..63b10e2
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of utility methods for working on GZIPed data.
+ */
+public class GZIPUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(GZIPUtils.class);
+  private static final int EXPECTED_COMPRESSION_RATIO = 5;
+  private static final int BUF_SIZE = 4096;
+
+  /**
+   * Returns an gunzipped copy of the input array. If the gzipped input has been
+   * truncated or corrupted, a best-effort attempt is made to unzip as much as
+   * possible. If no data can be extracted <code>null</code> is returned.
+   */
+  public static final byte[] unzipBestEffort(byte[] in) {
+    return unzipBestEffort(in, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Returns an gunzipped copy of the input array, truncated to
+   * <code>sizeLimit</code> bytes, if necessary. If the gzipped input has been
+   * truncated or corrupted, a best-effort attempt is made to unzip as much as
+   * possible. If no data can be extracted <code>null</code> is returned.
+   */
+  public static final byte[] unzipBestEffort(byte[] in, int sizeLimit) {
+    try {
+      // decompress using GZIPInputStream
+      ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+          EXPECTED_COMPRESSION_RATIO * in.length);
+
+      GZIPInputStream inStream = new GZIPInputStream(new ByteArrayInputStream(
+          in));
+
+      byte[] buf = new byte[BUF_SIZE];
+      int written = 0;
+      while (true) {
+        try {
+          int size = inStream.read(buf);
+          if (size <= 0)
+            break;
+          if ((written + size) > sizeLimit) {
+            outStream.write(buf, 0, sizeLimit - written);
+            break;
+          }
+          outStream.write(buf, 0, size);
+          written += size;
+        } catch (Exception e) {
+          break;
+        }
+      }
+      try {
+        outStream.close();
+      } catch (IOException e) {
+      }
+
+      return outStream.toByteArray();
+
+    } catch (IOException e) {
+      return null;
+    }
+  }
+
+  /**
+   * Returns an gunzipped copy of the input array.
+   * 
+   * @throws IOException
+   *           if the input cannot be properly decompressed
+   */
+  public static final byte[] unzip(byte[] in) throws IOException {
+    // decompress using GZIPInputStream
+    ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+        EXPECTED_COMPRESSION_RATIO * in.length);
+
+    GZIPInputStream inStream = new GZIPInputStream(new ByteArrayInputStream(in));
+
+    byte[] buf = new byte[BUF_SIZE];
+    while (true) {
+      int size = inStream.read(buf);
+      if (size <= 0)
+        break;
+      outStream.write(buf, 0, size);
+    }
+    outStream.close();
+
+    return outStream.toByteArray();
+  }
+
+  /**
+   * Returns an gzipped copy of the input array.
+   */
+  public static final byte[] zip(byte[] in) {
+    try {
+      // compress using GZIPOutputStream
+      ByteArrayOutputStream byteOut = new ByteArrayOutputStream(in.length
+          / EXPECTED_COMPRESSION_RATIO);
+
+      GZIPOutputStream outStream = new GZIPOutputStream(byteOut);
+
+      try {
+        outStream.write(in);
+      } catch (Exception e) {
+        LOG.error("Error writing outStream: ", e);
+      }
+
+      try {
+        outStream.close();
+      } catch (IOException e) {
+        LOG.error("Error closing outStream: ", e);
+      }
+
+      return byteOut.toByteArray();
+
+    } catch (IOException e) {
+      LOG.error("Error: ", e);
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java b/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java
new file mode 100644
index 0000000..755aad0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.GenericWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic Writable wrapper that can inject Configuration to
+ * {@link Configurable}s
+ */
+public abstract class GenericWritableConfigurable extends GenericWritable
+    implements Configurable {
+
+  private Configuration conf;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte type = in.readByte();
+    Class<?> clazz = getTypes()[type];
+    try {
+      set((Writable) clazz.newInstance());
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new IOException("Cannot initialize the class: " + clazz);
+    }
+    Writable w = get();
+    if (w instanceof Configurable)
+      ((Configurable) w).setConf(conf);
+    w.readFields(in);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java
new file mode 100644
index 0000000..6f471c1
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class HadoopFSUtil {
+
+  /**
+   * Returns PathFilter that passes all paths through.
+   */
+  public static PathFilter getPassAllFilter() {
+    return new PathFilter() {
+      public boolean accept(Path arg0) {
+        return true;
+      }
+    };
+  }
+
+  /**
+   * Returns PathFilter that passes directories through.
+   */
+  public static PathFilter getPassDirectoriesFilter(final FileSystem fs) {
+    return new PathFilter() {
+      public boolean accept(final Path path) {
+        try {
+          return fs.getFileStatus(path).isDirectory();
+        } catch (IOException ioe) {
+          return false;
+        }
+      }
+
+    };
+  }
+
+  /**
+   * Turns an array of FileStatus into an array of Paths.
+   */
+  public static Path[] getPaths(FileStatus[] stats) {
+    if (stats == null) {
+      return null;
+    }
+    if (stats.length == 0) {
+      return new Path[0];
+    }
+    Path[] res = new Path[stats.length];
+    for (int i = 0; i < stats.length; i++) {
+      res[i] = stats[i].getPath();
+    }
+    return res;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java
new file mode 100644
index 0000000..656a458
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.lang.time.DateUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of Jexl utilit(y|ies).
+ */
+public class JexlUtil {
+
+  public static final Logger LOG = LoggerFactory.getLogger(JexlUtil.class);
+
+  /**
+   * 
+   */
+  public static Pattern datePattern = Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z");
+
+  /**
+   * Parses the given experssion to a Jexl expression. This supports
+   * date parsing.
+   *
+   * @param expr the Jexl expression
+   * @return parsed Jexl expression or null in case of parse error
+   */
+  public static Expression parseExpression(String expr) {
+    if (expr == null) return null;
+    
+    try {
+      // Translate any date object into a long, dates must be specified as 20-03-2016T00:00:00Z
+      Matcher matcher = datePattern.matcher(expr);
+      if (matcher.find()) {
+        String date = matcher.group();
+        
+        // Parse the thing and get epoch!
+        Date parsedDate = DateUtils.parseDateStrictly(date, new String[] {"yyyy-MM-dd'T'HH:mm:ss'Z'"});
+        long time = parsedDate.getTime();
+        
+        // Replace in the original expression
+        expr = expr.replace(date, Long.toString(time));
+      }
+      
+      JexlEngine jexl = new JexlEngine();
+      jexl.setSilent(true);
+      jexl.setStrict(true);
+      return jexl.createExpression(expr);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+    }
+    
+    return null;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java
new file mode 100644
index 0000000..7e3bb97
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Utility methods for handling application-level locking.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class LockUtil {
+
+  /**
+   * Create a lock file.
+   * 
+   * @param fs
+   *          filesystem
+   * @param lockFile
+   *          name of the lock file
+   * @param accept
+   *          if true, and the target file exists, consider it valid. If false
+   *          and the target file exists, throw an IOException.
+   * @throws IOException
+   *           if accept is false, and the target file already exists, or if
+   *           it's a directory.
+   */
+  public static void createLockFile(FileSystem fs, Path lockFile, boolean accept)
+      throws IOException {
+    if (fs.exists(lockFile)) {
+      if (!accept)
+        throw new IOException("lock file " + lockFile + " already exists.");
+      if (fs.getFileStatus(lockFile).isDirectory())
+        throw new IOException("lock file " + lockFile
+            + " already exists and is a directory.");
+      // do nothing - the file already exists.
+    } else {
+      // make sure parents exist
+      fs.mkdirs(lockFile.getParent());
+      fs.createNewFile(lockFile);
+    }
+  }
+
+  /**
+   * Remove lock file. NOTE: applications enforce the semantics of this file -
+   * this method simply removes any file with a given name.
+   * 
+   * @param fs
+   *          filesystem
+   * @param lockFile
+   *          lock file name
+   * @return false, if the lock file doesn't exist. True, if it existed and was
+   *         successfully removed.
+   * @throws IOException
+   *           if lock file exists but it is a directory.
+   */
+  public static boolean removeLockFile(FileSystem fs, Path lockFile)
+      throws IOException {
+    if (!fs.exists(lockFile))
+      return false;
+    if (fs.getFileStatus(lockFile).isDirectory())
+      throw new IOException("lock file " + lockFile
+          + " exists but is a directory!");
+    return fs.delete(lockFile, false);
+  }
+}