You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:50 UTC
[34/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java b/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java
new file mode 100644
index 0000000..2e50105
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/warc/WARCExporter.java
@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.tools.warc;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+import org.apache.nutch.util.TimingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.martinkl.warc.WARCRecord;
+import com.martinkl.warc.WARCWritable;
+import com.martinkl.warc.mapred.WARCOutputFormat;
+
+/**
+ * MapReduce job to exports Nutch segments as WARC files. The file format is
+ * documented in the [ISO
+ * Standard](http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf).
+ * Generates elements of type response if the configuration 'store.http.headers'
+ * was set to true during the fetching and the http headers were stored
+ * verbatim; generates elements of type 'resource' otherwise.
+ **/
+
+public class WARCExporter extends Configured implements Tool {
+
+ public static Logger LOG = LoggerFactory.getLogger(WARCExporter.class);
+
+ private static final String CRLF = "\r\n";
+ private static final byte[] CRLF_BYTES = { 13, 10 };
+
+ public WARCExporter() {
+ super(null);
+ }
+
+ public WARCExporter(Configuration conf) {
+ super(conf);
+ }
+
+ public static class WARCReducer
+ implements Mapper<Text, Writable, Text, NutchWritable>,
+ Reducer<Text, NutchWritable, NullWritable, WARCWritable> {
+
+ SimpleDateFormat warcdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'",
+ Locale.ENGLISH);
+
+ @Override
+ public void configure(JobConf job) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ public void map(Text key, Writable value,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
+ output.collect(key, new NutchWritable(value));
+ }
+
+ @Override
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<NullWritable, WARCWritable> output, Reporter reporter)
+ throws IOException {
+
+ Content content = null;
+ CrawlDatum cd = null;
+
+ // aggregate the values found
+ while (values.hasNext()) {
+ final Writable value = values.next().get(); // unwrap
+ if (value instanceof Content) {
+ content = (Content) value;
+ continue;
+ }
+ if (value instanceof CrawlDatum) {
+ cd = (CrawlDatum) value;
+ continue;
+ }
+ }
+
+ // check that we have everything we need
+ if (content == null) {
+ LOG.info("Missing content for {}", key);
+ reporter.getCounter("WARCExporter", "missing content").increment(1);
+ return;
+ }
+
+ if (cd == null) {
+ LOG.info("Missing fetch datum for {}", key);
+ reporter.getCounter("WARCExporter", "missing metadata").increment(1);
+ return;
+ }
+
+ // were the headers stored as is? Can write a response element then
+ String headersVerbatim = content.getMetadata().get("_response.headers_");
+ byte[] httpheaders = new byte[0];
+ if (StringUtils.isNotBlank(headersVerbatim)) {
+ // check that ends with an empty line
+ if (!headersVerbatim.endsWith(CRLF + CRLF)) {
+ headersVerbatim += CRLF + CRLF;
+ }
+ httpheaders = headersVerbatim.getBytes();
+ }
+
+ StringBuilder buffer = new StringBuilder();
+ buffer.append(WARCRecord.WARC_VERSION);
+ buffer.append(CRLF);
+
+ buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
+ .append(UUID.randomUUID().toString()).append(">").append(CRLF);
+
+ int contentLength = 0;
+ if (content != null) {
+ contentLength = content.getContent().length;
+ }
+
+ // add the length of the http header
+ contentLength += httpheaders.length;
+
+ buffer.append("Content-Length").append(": ")
+ .append(Integer.toString(contentLength)).append(CRLF);
+
+ Date fetchedDate = new Date(cd.getFetchTime());
+ buffer.append("WARC-Date").append(": ").append(warcdf.format(fetchedDate))
+ .append(CRLF);
+
+ // check if http headers have been stored verbatim
+ // if not generate a response instead
+ String WARCTypeValue = "resource";
+
+ if (StringUtils.isNotBlank(headersVerbatim)) {
+ WARCTypeValue = "response";
+ }
+
+ buffer.append("WARC-Type").append(": ").append(WARCTypeValue)
+ .append(CRLF);
+
+ // "WARC-IP-Address" if present
+ String IP = content.getMetadata().get("_ip_");
+ if (StringUtils.isNotBlank(IP)) {
+ buffer.append("WARC-IP-Address").append(": ").append("IP").append(CRLF);
+ }
+
+ // detect if truncated only for fetch success
+ String status = CrawlDatum.getStatusName(cd.getStatus());
+ if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS")
+ && ParseSegment.isTruncated(content)) {
+ buffer.append("WARC-Truncated").append(": ").append("unspecified")
+ .append(CRLF);
+ }
+
+ // must be a valid URI
+ try {
+ String normalised = key.toString().replaceAll(" ", "%20");
+ URI uri = URI.create(normalised);
+ buffer.append("WARC-Target-URI").append(": ")
+ .append(uri.toASCIIString()).append(CRLF);
+ } catch (Exception e) {
+ LOG.error("Invalid URI {} ", key);
+ reporter.getCounter("WARCExporter", "invalid URI").increment(1);
+ return;
+ }
+
+ // provide a ContentType if type response
+ if (WARCTypeValue.equals("response")) {
+ buffer.append("Content-Type: application/http; msgtype=response")
+ .append(CRLF);
+ }
+
+ // finished writing the WARC headers, now let's serialize it
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+ // store the headers
+ bos.write(buffer.toString().getBytes("UTF-8"));
+ bos.write(CRLF_BYTES);
+ // the http headers
+ bos.write(httpheaders);
+
+ // the binary content itself
+ if (content.getContent() != null) {
+ bos.write(content.getContent());
+ }
+ bos.write(CRLF_BYTES);
+ bos.write(CRLF_BYTES);
+
+ try {
+ DataInput in = new DataInputStream(
+ new ByteArrayInputStream(bos.toByteArray()));
+ WARCRecord record = new WARCRecord(in);
+ output.collect(NullWritable.get(), new WARCWritable(record));
+ reporter.getCounter("WARCExporter", "records generated").increment(1);
+ } catch (IOException exception) {
+ LOG.error("Exception when generating WARC record for {} : {}", key,
+ exception.getMessage());
+ reporter.getCounter("WARCExporter", "exception").increment(1);
+ }
+
+ }
+ }
+
+ public int generateWARC(String output, List<Path> segments) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("WARCExporter: starting at {}", sdf.format(start));
+
+ final JobConf job = new NutchJob(getConf());
+ job.setJobName("warc-exporter " + output);
+
+ for (final Path segment : segments) {
+ LOG.info("warc-exporter: adding segment: {}", segment);
+ FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+ FileInputFormat.addInputPath(job,
+ new Path(segment, CrawlDatum.FETCH_DIR_NAME));
+ }
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+
+ job.setMapperClass(WARCReducer.class);
+ job.setReducerClass(WARCReducer.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(NutchWritable.class);
+
+ FileOutputFormat.setOutputPath(job, new Path(output));
+ // using the old api
+ job.setOutputFormat(WARCOutputFormat.class);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(WARCWritable.class);
+
+ try {
+ RunningJob rj = JobClient.runJob(job);
+ LOG.info(rj.getCounters().toString());
+ long end = System.currentTimeMillis();
+ LOG.info("WARCExporter: finished at {}, elapsed: {}", sdf.format(end),
+ TimingUtil.elapsedTime(start, end));
+ } catch (Exception e) {
+ LOG.error("Exception caught", e);
+ return -1;
+ }
+
+ return 0;
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err.println(
+ "Usage: WARCExporter <output> (<segment> ... | -dir <segments>)");
+ return -1;
+ }
+
+ final List<Path> segments = new ArrayList<Path>();
+
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-dir")) {
+ Path dir = new Path(args[++i]);
+ FileSystem fs = dir.getFileSystem(getConf());
+ FileStatus[] fstats = fs.listStatus(dir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] files = HadoopFSUtil.getPaths(fstats);
+ for (Path p : files) {
+ segments.add(p);
+ }
+ } else {
+ segments.add(new Path(args[i]));
+ }
+ }
+
+ return generateWARC(args[0], segments);
+ }
+
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new WARCExporter(), args);
+ System.exit(res);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java b/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java
new file mode 100644
index 0000000..44e1a94
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/tools/warc/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/**
+ * Tools to import / export between Nutch segments and
+ * <a href="http://bibnum.bnf.fr/warc/WARC_ISO_28500_version1_latestdraft.pdf">
+ * WARC archives</a>.
+ */
+package org.apache.nutch.tools.warc;
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java b/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java
new file mode 100644
index 0000000..593d590
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/CommandRunner.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Adopted by John Xing for Nutch Project from
+ * http://blog.fivesight.com/prb/space/Call+an+External+Command+from+Java/,
+ * which explains the code in detail.
+ * [Original author is moving his site to http://mult.ifario.us/ -peb]
+ *
+ * Comments by John Xing on 20040621:
+ * (1) EDU.oswego.cs.dl.util.concurrent.* is in j2sdk 1.5 now.
+ * Modifications are needed if we move to j2sdk 1.5.
+ * (2) The original looks good, not much to change.
+ *
+ * This code is in the public domain and comes with no warranty.
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.InterruptedIOException;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class CommandRunner {
+
+ private boolean _waitForExit = true;
+ private String _command;
+ private int _timeout = 10;
+
+ private InputStream _stdin;
+ private OutputStream _stdout;
+ private OutputStream _stderr;
+
+ private static final int BUF = 4096;
+
+ private int _xit;
+
+ private Throwable _thrownError;
+
+ private CyclicBarrier _barrier;
+
+ public int getExitValue() {
+ return _xit;
+ }
+
+ public void setCommand(String s) {
+ _command = s;
+ }
+
+ public String getCommand() {
+ return _command;
+ }
+
+ public void setInputStream(InputStream is) {
+ _stdin = is;
+ }
+
+ public void setStdOutputStream(OutputStream os) {
+ _stdout = os;
+ }
+
+ public void setStdErrorStream(OutputStream os) {
+ _stderr = os;
+ }
+
+ public void evaluate() throws IOException {
+ this.exec();
+ }
+
+ /**
+ *
+ * @return process exit value (return code) or -1 if timed out.
+ * @throws IOException
+ */
+ public int exec() throws IOException {
+ Process proc = Runtime.getRuntime().exec(_command);
+ _barrier = new CyclicBarrier(3 + ((_stdin != null) ? 1 : 0));
+
+ PullerThread so = new PullerThread("STDOUT", proc.getInputStream(), _stdout);
+ so.setDaemon(true);
+ so.start();
+
+ PullerThread se = new PullerThread("STDERR", proc.getErrorStream(), _stderr);
+ se.setDaemon(true);
+ se.start();
+
+ PusherThread si = null;
+ if (_stdin != null) {
+ si = new PusherThread("STDIN", _stdin, proc.getOutputStream());
+ si.setDaemon(true);
+ si.start();
+ }
+
+ boolean _timedout = false;
+ long end = System.currentTimeMillis() + _timeout * 1000;
+
+ //
+ try {
+ if (_timeout == 0) {
+ _barrier.await();
+ } else {
+ _barrier.await(_timeout, TimeUnit.SECONDS);
+ }
+ } catch (TimeoutException ex) {
+ _timedout = true;
+ } catch (BrokenBarrierException bbe) {
+ /* IGNORE */
+ } catch (InterruptedException e) {
+ /* IGNORE */
+ }
+
+ // tell the io threads we are finished
+ if (si != null) {
+ si.interrupt();
+ }
+ so.interrupt();
+ se.interrupt();
+
+ _xit = -1;
+
+ if (!_timedout) {
+ if (_waitForExit) {
+ do {
+ try {
+ Thread.sleep(1000);
+ _xit = proc.exitValue();
+ } catch (InterruptedException ie) {
+ if (Thread.interrupted()) {
+ break; // stop waiting on an interrupt for this thread
+ } else {
+ continue;
+ }
+ } catch (IllegalThreadStateException iltse) {
+ continue;
+ }
+ break;
+ } while (!(_timedout = (System.currentTimeMillis() > end)));
+ } else {
+ try {
+ _xit = proc.exitValue();
+ } catch (IllegalThreadStateException iltse) {
+ _timedout = true;
+ }
+ }
+ }
+
+ if (_waitForExit) {
+ proc.destroy();
+ }
+ return _xit;
+ }
+
+ public Throwable getThrownError() {
+ return _thrownError;
+ }
+
+ private class PumperThread extends Thread {
+
+ private OutputStream _os;
+ private InputStream _is;
+
+ private boolean _closeInput;
+
+ protected PumperThread(String name, InputStream is, OutputStream os,
+ boolean closeInput) {
+ super(name);
+ _is = is;
+ _os = os;
+ _closeInput = closeInput;
+ }
+
+ public void run() {
+ try {
+ byte[] buf = new byte[BUF];
+ int read = 0;
+ while (!isInterrupted() && (read = _is.read(buf)) != -1) {
+ if (read == 0)
+ continue;
+ _os.write(buf, 0, read);
+ _os.flush();
+ }
+ } catch (InterruptedIOException iioe) {
+ // ignored
+ } catch (Throwable t) {
+ _thrownError = t;
+ } finally {
+ try {
+ if (_closeInput) {
+ _is.close();
+ } else {
+ _os.close();
+ }
+ } catch (IOException ioe) {
+ /* IGNORE */
+ }
+ }
+ try {
+ _barrier.await();
+ } catch (InterruptedException ie) {
+ /* IGNORE */
+ } catch (BrokenBarrierException bbe) {
+ /* IGNORE */
+ }
+ }
+ }
+
+ private class PusherThread extends PumperThread {
+ PusherThread(String name, InputStream is, OutputStream os) {
+ super(name, is, os, false);
+ }
+ }
+
+ private class PullerThread extends PumperThread {
+ PullerThread(String name, InputStream is, OutputStream os) {
+ super(name, is, os, true);
+ }
+ }
+
+ public int getTimeout() {
+ return _timeout;
+ }
+
+ public void setTimeout(int timeout) {
+ _timeout = timeout;
+ }
+
+ public boolean getWaitForExit() {
+ return _waitForExit;
+ }
+
+ public void setWaitForExit(boolean waitForExit) {
+ _waitForExit = waitForExit;
+ }
+
+ public static void main(String[] args) throws Exception {
+ String commandPath = null;
+ String filePath = null;
+ int timeout = 10;
+
+ String usage = "Usage: CommandRunner [-timeout timeoutSecs] commandPath filePath";
+
+ if (args.length < 2) {
+ System.err.println(usage);
+ System.exit(-1);
+ }
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-timeout")) {
+ timeout = Integer.parseInt(args[++i]);
+ ;
+ } else if (i != args.length - 2) {
+ System.err.println(usage);
+ System.exit(-1);
+ } else {
+ commandPath = args[i];
+ filePath = args[++i];
+ }
+ }
+
+ CommandRunner cr = new CommandRunner();
+
+ cr.setCommand(commandPath);
+ cr.setInputStream(new java.io.FileInputStream(filePath));
+ cr.setStdErrorStream(System.err);
+ cr.setStdOutputStream(System.out);
+
+ cr.setTimeout(timeout);
+
+ cr.evaluate();
+
+ System.err.println("output value: " + cr.getExitValue());
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java b/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java
new file mode 100644
index 0000000..8aafe59
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/CrawlCompletionStats.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.text.SimpleDateFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.TimingUtil;
+import org.apache.nutch.util.URLUtil;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.MissingOptionException;
+
+/**
+ * Extracts some simple crawl completion stats from the crawldb
+ *
+ * Stats will be sorted by host/domain and will be of the form:
+ * 1 www.spitzer.caltech.edu FETCHED
+ * 50 www.spitzer.caltech.edu UNFETCHED
+ *
+ */
+public class CrawlCompletionStats extends Configured implements Tool {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CrawlCompletionStats.class);
+
+ private static final int MODE_HOST = 1;
+ private static final int MODE_DOMAIN = 2;
+
+ private int mode = 0;
+
+ public int run(String[] args) throws Exception {
+ Option helpOpt = new Option("h", "help", false, "Show this message");
+ Option inDirs = OptionBuilder
+ .withArgName("inputDirs")
+ .isRequired()
+ .withDescription("Comma separated list of crawl directories (e.g., \"./crawl1,./crawl2\")")
+ .hasArgs()
+ .create("inputDirs");
+ Option outDir = OptionBuilder
+ .withArgName("outputDir")
+ .isRequired()
+ .withDescription("Output directory where results should be dumped")
+ .hasArgs()
+ .create("outputDir");
+ Option modeOpt = OptionBuilder
+ .withArgName("mode")
+ .isRequired()
+ .withDescription("Set statistics gathering mode (by 'host' or by 'domain')")
+ .hasArgs()
+ .create("mode");
+ Option numReducers = OptionBuilder
+ .withArgName("numReducers")
+ .withDescription("Optional number of reduce jobs to use. Defaults to 1")
+ .hasArgs()
+ .create("numReducers");
+
+ Options options = new Options();
+ options.addOption(helpOpt);
+ options.addOption(inDirs);
+ options.addOption(outDir);
+ options.addOption(modeOpt);
+ options.addOption(numReducers);
+
+ CommandLineParser parser = new GnuParser();
+ CommandLine cli;
+
+ try {
+ cli = parser.parse(options, args);
+ } catch (MissingOptionException e) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("CrawlCompletionStats", options, true);
+ return 1;
+ }
+
+ if (cli.hasOption("help")) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("CrawlCompletionStats", options, true);
+ return 1;
+ }
+
+ String inputDir = cli.getOptionValue("inputDirs");
+ String outputDir = cli.getOptionValue("outputDir");
+
+ int numOfReducers = 1;
+ if (cli.hasOption("numReducers")) {
+ numOfReducers = Integer.parseInt(args[3]);
+ }
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("CrawlCompletionStats: starting at {}", sdf.format(start));
+
+ int mode = 0;
+ String jobName = "CrawlCompletionStats";
+ if (cli.getOptionValue("mode").equals("host")) {
+ jobName = "Host CrawlCompletionStats";
+ mode = MODE_HOST;
+ } else if (cli.getOptionValue("mode").equals("domain")) {
+ jobName = "Domain CrawlCompletionStats";
+ mode = MODE_DOMAIN;
+ }
+
+ Configuration conf = getConf();
+ conf.setInt("domain.statistics.mode", mode);
+ conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+
+ Job job = Job.getInstance(conf, jobName);
+ job.setJarByClass(CrawlCompletionStats.class);
+
+ String[] inputDirsSpecs = inputDir.split(",");
+ for (int i = 0; i < inputDirsSpecs.length; i++) {
+ File completeInputPath = new File(new File(inputDirsSpecs[i]), "crawldb/current");
+ FileInputFormat.addInputPath(job, new Path(completeInputPath.toString()));
+
+ }
+
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileOutputFormat.setOutputPath(job, new Path(outputDir));
+ job.setOutputFormatClass(TextOutputFormat.class);
+
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(LongWritable.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(LongWritable.class);
+
+ job.setMapperClass(CrawlCompletionStatsMapper.class);
+ job.setReducerClass(CrawlCompletionStatsReducer.class);
+ job.setCombinerClass(CrawlCompletionStatsCombiner.class);
+ job.setNumReduceTasks(numOfReducers);
+
+ try {
+ job.waitForCompletion(true);
+ } catch (Exception e) {
+ throw e;
+ }
+
+ long end = System.currentTimeMillis();
+ LOG.info("CrawlCompletionStats: finished at {}, elapsed: {}",
+ sdf.format(end), TimingUtil.elapsedTime(start, end));
+ return 0;
+ }
+
+ static class CrawlCompletionStatsMapper extends
+ Mapper<Text, CrawlDatum, Text, LongWritable> {
+ int mode = 0;
+
+ public void setup(Context context) {
+ mode = context.getConfiguration().getInt("domain.statistics.mode", MODE_DOMAIN);
+ }
+
+ public void map(Text urlText, CrawlDatum datum, Context context)
+ throws IOException, InterruptedException {
+
+ URL url = new URL(urlText.toString());
+ String out = "";
+ switch (mode) {
+ case MODE_HOST:
+ out = url.getHost();
+ break;
+ case MODE_DOMAIN:
+ out = URLUtil.getDomainName(url);
+ break;
+ }
+
+ if (datum.getStatus() == CrawlDatum.STATUS_DB_FETCHED
+ || datum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
+ context.write(new Text(out + " FETCHED"), new LongWritable(1));
+ } else {
+ context.write(new Text(out + " UNFETCHED"), new LongWritable(1));
+ }
+ }
+ }
+
+ static class CrawlCompletionStatsReducer extends
+ Reducer<Text, LongWritable, LongWritable, Text> {
+ public void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long total = 0;
+
+ for (LongWritable val : values) {
+ total += val.get();
+ }
+
+ context.write(new LongWritable(total), key);
+ }
+ }
+
+ public static class CrawlCompletionStatsCombiner extends
+ Reducer<Text, LongWritable, Text, LongWritable> {
+ public void reduce(Text key, Iterable<LongWritable> values, Context context)
+ throws IOException, InterruptedException {
+ long total = 0;
+
+ for (LongWritable val : values) {
+ total += val.get();
+ }
+ context.write(key, new LongWritable(total));
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(NutchConfiguration.create(), new CrawlCompletionStats(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java
new file mode 100644
index 0000000..5863522
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/DeflateUtils.java
@@ -0,0 +1,140 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.zip.Inflater;
+import java.util.zip.InflaterInputStream;
+import java.util.zip.DeflaterOutputStream;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of utility methods for working on deflated data.
+ */
+public class DeflateUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DeflateUtils.class);
+ private static final int EXPECTED_COMPRESSION_RATIO = 5;
+ private static final int BUF_SIZE = 4096;
+
+ /**
+ * Returns an inflated copy of the input array. If the deflated input has been
+ * truncated or corrupted, a best-effort attempt is made to inflate as much as
+ * possible. If no data can be extracted <code>null</code> is returned.
+ */
+ public static final byte[] inflateBestEffort(byte[] in) {
+ return inflateBestEffort(in, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Returns an inflated copy of the input array, truncated to
+ * <code>sizeLimit</code> bytes, if necessary. If the deflated input has been
+ * truncated or corrupted, a best-effort attempt is made to inflate as much as
+ * possible. If no data can be extracted <code>null</code> is returned.
+ */
+ public static final byte[] inflateBestEffort(byte[] in, int sizeLimit) {
+ // decompress using InflaterInputStream
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+ EXPECTED_COMPRESSION_RATIO * in.length);
+
+ // "true" because HTTP does not provide zlib headers
+ Inflater inflater = new Inflater(true);
+ InflaterInputStream inStream = new InflaterInputStream(
+ new ByteArrayInputStream(in), inflater);
+
+ byte[] buf = new byte[BUF_SIZE];
+ int written = 0;
+ while (true) {
+ try {
+ int size = inStream.read(buf);
+ if (size <= 0)
+ break;
+ if ((written + size) > sizeLimit) {
+ outStream.write(buf, 0, sizeLimit - written);
+ break;
+ }
+ outStream.write(buf, 0, size);
+ written += size;
+ } catch (Exception e) {
+ LOG.info("Caught Exception in inflateBestEffort", e);
+ break;
+ }
+ }
+ try {
+ outStream.close();
+ } catch (IOException e) {
+ }
+
+ return outStream.toByteArray();
+ }
+
+ /**
+ * Returns an inflated copy of the input array.
+ *
+ * @throws IOException
+ * if the input cannot be properly decompressed
+ */
+ public static final byte[] inflate(byte[] in) throws IOException {
+ // decompress using InflaterInputStream
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+ EXPECTED_COMPRESSION_RATIO * in.length);
+
+ InflaterInputStream inStream = new InflaterInputStream(
+ new ByteArrayInputStream(in));
+
+ byte[] buf = new byte[BUF_SIZE];
+ while (true) {
+ int size = inStream.read(buf);
+ if (size <= 0)
+ break;
+ outStream.write(buf, 0, size);
+ }
+ outStream.close();
+
+ return outStream.toByteArray();
+ }
+
+ /**
+ * Returns a deflated copy of the input array.
+ */
+ public static final byte[] deflate(byte[] in) {
+ // compress using DeflaterOutputStream
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream(in.length
+ / EXPECTED_COMPRESSION_RATIO);
+
+ DeflaterOutputStream outStream = new DeflaterOutputStream(byteOut);
+
+ try {
+ outStream.write(in);
+ } catch (Exception e) {
+ LOG.error("Error compressing: ", e);
+ }
+
+ try {
+ outStream.close();
+ } catch (IOException e) {
+ LOG.error("Error closing: ", e);
+ }
+
+ return byteOut.toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java
new file mode 100644
index 0000000..9595bf4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/DomUtil.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerConfigurationException;
+import javax.xml.transform.TransformerException;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.xerces.parsers.DOMParser;
+import org.w3c.dom.Element;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DomUtil {
+
+ private final static Logger LOG = LoggerFactory.getLogger(DomUtil.class);
+
+ /**
+ * Returns parsed dom tree or null if any error
+ *
+ * @param is
+ * @return A parsed DOM tree from the given {@link InputStream}.
+ */
+ public static Element getDom(InputStream is) {
+
+ Element element = null;
+
+ DOMParser parser = new DOMParser();
+
+ InputSource input;
+ try {
+ input = new InputSource(is);
+ input.setEncoding("UTF-8");
+ parser.parse(input);
+ int i = 0;
+ while (!(parser.getDocument().getChildNodes().item(i) instanceof Element)) {
+ i++;
+ }
+ element = (Element) parser.getDocument().getChildNodes().item(i);
+ } catch (FileNotFoundException e) {
+ LOG.error("Error: ", e);
+ } catch (SAXException e) {
+ LOG.error("Error: ", e);
+ } catch (IOException e) {
+ LOG.error("Error: ", e);
+ }
+ return element;
+ }
+
+ /**
+ * save dom into ouputstream
+ *
+ * @param os
+ * @param e
+ */
+ public static void saveDom(OutputStream os, Element e) {
+
+ DOMSource source = new DOMSource(e);
+ TransformerFactory transFactory = TransformerFactory.newInstance();
+ Transformer transformer;
+ try {
+ transformer = transFactory.newTransformer();
+ transformer.setOutputProperty("indent", "yes");
+ StreamResult result = new StreamResult(os);
+ transformer.transform(source, result);
+ os.flush();
+ } catch (UnsupportedEncodingException e1) {
+ LOG.error("Error: ", e1);
+ } catch (IOException e1) {
+ LOG.error("Error: ", e1);
+ } catch (TransformerConfigurationException e2) {
+ LOG.error("Error: ", e2);
+ } catch (TransformerException ex) {
+ LOG.error("Error: ", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java
new file mode 100644
index 0000000..9ed3e75
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/DumpFileUtil.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class DumpFileUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(DumpFileUtil.class
+ .getName());
+
+ private final static String DIR_PATTERN = "%s/%s/%s";
+ private final static String FILENAME_PATTERN = "%s_%s.%s";
+ private final static Integer MAX_LENGTH_OF_FILENAME = 32;
+ private final static Integer MAX_LENGTH_OF_EXTENSION = 5;
+
+ public static String getUrlMD5(String url) {
+ byte[] digest = MD5Hash.digest(url).getDigest();
+
+ StringBuffer sb = new StringBuffer();
+ for (byte b : digest) {
+ sb.append(String.format("%02x", b & 0xff));
+ }
+
+ return sb.toString();
+ }
+
+ public static String createTwoLevelsDirectory(String basePath, String md5, boolean makeDir) {
+ String firstLevelDirName = new StringBuilder().append(md5.charAt(0)).append(md5.charAt(8)).toString();
+ String secondLevelDirName = new StringBuilder().append(md5.charAt(16)).append(md5.charAt(24)).toString();
+
+ String fullDirPath = String.format(DIR_PATTERN, basePath, firstLevelDirName, secondLevelDirName);
+
+ if (makeDir) {
+ try {
+ FileUtils.forceMkdir(new File(fullDirPath));
+ } catch (IOException e) {
+ LOG.error("Failed to create dir: {}", fullDirPath);
+ fullDirPath = null;
+ }
+ }
+
+ return fullDirPath;
+ }
+
+ public static String createTwoLevelsDirectory(String basePath, String md5) {
+ return createTwoLevelsDirectory(basePath, md5, true);
+ }
+
+ public static String createFileName(String md5, String fileBaseName, String fileExtension) {
+ if (fileBaseName.length() > MAX_LENGTH_OF_FILENAME) {
+ LOG.info("File name is too long. Truncated to {} characters.", MAX_LENGTH_OF_FILENAME);
+ fileBaseName = StringUtils.substring(fileBaseName, 0, MAX_LENGTH_OF_FILENAME);
+ }
+
+ if (fileExtension.length() > MAX_LENGTH_OF_EXTENSION) {
+ LOG.info("File extension is too long. Truncated to {} characters.", MAX_LENGTH_OF_EXTENSION);
+ fileExtension = StringUtils.substring(fileExtension, 0, MAX_LENGTH_OF_EXTENSION);
+ }
+
+ // Added to prevent FileNotFoundException (Invalid Argument) - in *nix environment
+ fileBaseName = fileBaseName.replaceAll("\\?", "");
+ fileExtension = fileExtension.replaceAll("\\?", "");
+
+ return String.format(FILENAME_PATTERN, md5, fileBaseName, fileExtension);
+ }
+
+ public static String createFileNameFromUrl(String basePath, String reverseKey, String urlString, String epochScrapeTime, String fileExtension, boolean makeDir) {
+ String fullDirPath = basePath + File.separator + reverseKey + File.separator + DigestUtils.sha1Hex(urlString);
+
+ if (makeDir) {
+ try {
+ FileUtils.forceMkdir(new File(fullDirPath));
+ } catch (IOException e) {
+ LOG.error("Failed to create dir: {}", fullDirPath);
+ fullDirPath = null;
+ }
+ }
+
+ if (fileExtension.length() > MAX_LENGTH_OF_EXTENSION) {
+ LOG.info("File extension is too long. Truncated to {} characters.", MAX_LENGTH_OF_EXTENSION);
+ fileExtension = StringUtils.substring(fileExtension, 0, MAX_LENGTH_OF_EXTENSION);
+ }
+
+ String outputFullPath = fullDirPath + File.separator + epochScrapeTime + "." + fileExtension;
+
+ return outputFullPath;
+ }
+
+ public static String displayFileTypes(Map<String, Integer> typeCounts, Map<String, Integer> filteredCounts) {
+ StringBuilder builder = new StringBuilder();
+ // print total stats
+ builder.append("\nTOTAL Stats:\n");
+ builder.append("[\n");
+ int mimetypeCount = 0;
+ for (String mimeType : typeCounts.keySet()) {
+ builder.append(" {\"mimeType\":\"");
+ builder.append(mimeType);
+ builder.append("\",\"count\":\"");
+ builder.append(typeCounts.get(mimeType));
+ builder.append("\"}\n");
+ mimetypeCount += typeCounts.get(mimeType);
+ }
+ builder.append("]\n");
+ builder.append("Total count: " + mimetypeCount + "\n");
+ // filtered types stats
+ mimetypeCount = 0;
+ if (!filteredCounts.isEmpty()) {
+ builder.append("\nFILTERED Stats:\n");
+ builder.append("[\n");
+ for (String mimeType : filteredCounts.keySet()) {
+ builder.append(" {\"mimeType\":\"");
+ builder.append(mimeType);
+ builder.append("\",\"count\":\"");
+ builder.append(filteredCounts.get(mimeType));
+ builder.append("\"}\n");
+ mimetypeCount += filteredCounts.get(mimeType);
+ }
+ builder.append("]\n");
+ builder.append("Total filtered count: " + mimetypeCount + "\n");
+ }
+ return builder.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java b/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java
new file mode 100644
index 0000000..4e62dd3
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/EncodingDetector.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.metadata.Metadata;
+import org.apache.nutch.net.protocols.Response;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.NutchConfiguration;
+
+import com.ibm.icu.text.CharsetDetector;
+import com.ibm.icu.text.CharsetMatch;
+
+/**
+ * A simple class for detecting character encodings.
+ *
+ * <p>
+ * Broadly this encompasses two functions, which are distinctly separate:
+ *
+ * <ol>
+ * <li>Auto detecting a set of "clues" from input text.</li>
+ * <li>Taking a set of clues and making a "best guess" as to the "real"
+ * encoding.</li>
+ * </ol>
+ * </p>
+ *
+ * <p>
+ * A caller will often have some extra information about what the encoding might
+ * be (e.g. from the HTTP header or HTML meta-tags, often wrong but still
+ * potentially useful clues). The types of clues may differ from caller to
+ * caller. Thus a typical calling sequence is:
+ * <ul>
+ * <li>Run step (1) to generate a set of auto-detected clues;</li>
+ * <li>Combine these clues with the caller-dependent "extra clues" available;</li>
+ * <li>Run step (2) to guess what the most probable answer is.</li>
+ * </p>
+ */
+public class EncodingDetector {
+
+ private class EncodingClue {
+ private String value;
+ private String source;
+ private int confidence;
+
+ // Constructor for clues with no confidence values (ignore thresholds)
+ public EncodingClue(String value, String source) {
+ this(value, source, NO_THRESHOLD);
+ }
+
+ public EncodingClue(String value, String source, int confidence) {
+ this.value = value.toLowerCase();
+ this.source = source;
+ this.confidence = confidence;
+ }
+
+ public String getSource() {
+ return source;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public String toString() {
+ return value + " (" + source
+ + ((confidence >= 0) ? ", " + confidence + "% confidence" : "") + ")";
+ }
+
+ public boolean isEmpty() {
+ return (value == null || "".equals(value));
+ }
+
+ public boolean meetsThreshold() {
+ return (confidence < 0 || (minConfidence >= 0 && confidence >= minConfidence));
+ }
+ }
+
+ public static final Logger LOG = LoggerFactory
+ .getLogger(EncodingDetector.class);
+
+ public static final int NO_THRESHOLD = -1;
+
+ public static final String MIN_CONFIDENCE_KEY = "encodingdetector.charset.min.confidence";
+
+ private static final HashMap<String, String> ALIASES = new HashMap<String, String>();
+
+ private static final HashSet<String> DETECTABLES = new HashSet<String>();
+
+ // CharsetDetector will die without a minimum amount of data.
+ private static final int MIN_LENGTH = 4;
+
+ static {
+ DETECTABLES.add("text/html");
+ DETECTABLES.add("text/plain");
+ DETECTABLES.add("text/richtext");
+ DETECTABLES.add("text/rtf");
+ DETECTABLES.add("text/sgml");
+ DETECTABLES.add("text/tab-separated-values");
+ DETECTABLES.add("text/xml");
+ DETECTABLES.add("application/rss+xml");
+ DETECTABLES.add("application/xhtml+xml");
+ /*
+ * the following map is not an alias mapping table, but maps character
+ * encodings which are often used in mislabelled documents to their correct
+ * encodings. For instance, there are a lot of documents labelled
+ * 'ISO-8859-1' which contain characters not covered by ISO-8859-1 but
+ * covered by windows-1252. Because windows-1252 is a superset of ISO-8859-1
+ * (sharing code points for the common part), it's better to treat
+ * ISO-8859-1 as synonymous with windows-1252 than to reject, as invalid,
+ * documents labelled as ISO-8859-1 that have characters outside ISO-8859-1.
+ */
+ ALIASES.put("ISO-8859-1", "windows-1252");
+ ALIASES.put("EUC-KR", "x-windows-949");
+ ALIASES.put("x-EUC-CN", "GB18030");
+ ALIASES.put("GBK", "GB18030");
+ // ALIASES.put("Big5", "Big5HKSCS");
+ // ALIASES.put("TIS620", "Cp874");
+ // ALIASES.put("ISO-8859-11", "Cp874");
+
+ }
+
+ private int minConfidence;
+
+ private CharsetDetector detector;
+
+ private List<EncodingClue> clues;
+
+ public EncodingDetector(Configuration conf) {
+ minConfidence = conf.getInt(MIN_CONFIDENCE_KEY, -1);
+ detector = new CharsetDetector();
+ clues = new ArrayList<EncodingClue>();
+ }
+
+ public void autoDetectClues(Content content, boolean filter) {
+ byte[] data = content.getContent();
+
+ if (minConfidence >= 0 && DETECTABLES.contains(content.getContentType())
+ && data.length > MIN_LENGTH) {
+ CharsetMatch[] matches = null;
+
+ // do all these in a try/catch; setText and detect/detectAll
+ // will sometimes throw exceptions
+ try {
+ detector.enableInputFilter(filter);
+ if (data.length > MIN_LENGTH) {
+ detector.setText(data);
+ matches = detector.detectAll();
+ }
+ } catch (Exception e) {
+ LOG.debug("Exception from ICU4J (ignoring): ", e);
+ }
+
+ if (matches != null) {
+ for (CharsetMatch match : matches) {
+ addClue(match.getName(), "detect", match.getConfidence());
+ }
+ }
+ }
+
+ // add character encoding coming from HTTP response header
+ addClue(
+ parseCharacterEncoding(content.getMetadata().get(Response.CONTENT_TYPE)),
+ "header");
+ }
+
+ public void addClue(String value, String source, int confidence) {
+ if (value == null || "".equals(value)) {
+ return;
+ }
+ value = resolveEncodingAlias(value);
+ if (value != null) {
+ clues.add(new EncodingClue(value, source, confidence));
+ }
+ }
+
+ public void addClue(String value, String source) {
+ addClue(value, source, NO_THRESHOLD);
+ }
+
+ /**
+ * Guess the encoding with the previously specified list of clues.
+ *
+ * @param content
+ * Content instance
+ * @param defaultValue
+ * Default encoding to return if no encoding can be detected with
+ * enough confidence. Note that this will <b>not</b> be normalized
+ * with {@link EncodingDetector#resolveEncodingAlias}
+ *
+ * @return Guessed encoding or defaultValue
+ */
+ public String guessEncoding(Content content, String defaultValue) {
+ /*
+ * This algorithm could be replaced by something more sophisticated; ideally
+ * we would gather a bunch of data on where various clues (autodetect, HTTP
+ * headers, HTML meta tags, etc.) disagree, tag each with the correct
+ * answer, and use machine learning/some statistical method to generate a
+ * better heuristic.
+ */
+
+ String base = content.getBaseUrl();
+
+ if (LOG.isTraceEnabled()) {
+ findDisagreements(base, clues);
+ }
+
+ /*
+ * Go down the list of encoding "clues". Use a clue if: 1. Has a confidence
+ * value which meets our confidence threshold, OR 2. Doesn't meet the
+ * threshold, but is the best try, since nothing else is available.
+ */
+ EncodingClue defaultClue = new EncodingClue(defaultValue, "default");
+ EncodingClue bestClue = defaultClue;
+
+ for (EncodingClue clue : clues) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(base + ": charset " + clue);
+ }
+ String charset = clue.value;
+ if (minConfidence >= 0 && clue.confidence >= minConfidence) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(base + ": Choosing encoding: " + charset
+ + " with confidence " + clue.confidence);
+ }
+ return resolveEncodingAlias(charset).toLowerCase();
+ } else if (clue.confidence == NO_THRESHOLD && bestClue == defaultClue) {
+ bestClue = clue;
+ }
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(base + ": Choosing encoding: " + bestClue);
+ }
+ return bestClue.value.toLowerCase();
+ }
+
+ /** Clears all clues. */
+ public void clearClues() {
+ clues.clear();
+ }
+
+ /*
+ * Strictly for analysis, look for "disagreements." The top guess from each
+ * source is examined; if these meet the threshold and disagree, then we log
+ * the information -- useful for testing or generating training data for a
+ * better heuristic.
+ */
+ private void findDisagreements(String url, List<EncodingClue> newClues) {
+ HashSet<String> valsSeen = new HashSet<String>();
+ HashSet<String> sourcesSeen = new HashSet<String>();
+ boolean disagreement = false;
+ for (int i = 0; i < newClues.size(); i++) {
+ EncodingClue clue = newClues.get(i);
+ if (!clue.isEmpty() && !sourcesSeen.contains(clue.source)) {
+ if (valsSeen.size() > 0 && !valsSeen.contains(clue.value)
+ && clue.meetsThreshold()) {
+ disagreement = true;
+ }
+ if (clue.meetsThreshold()) {
+ valsSeen.add(clue.value);
+ }
+ sourcesSeen.add(clue.source);
+ }
+ }
+ if (disagreement) {
+ // dump all values in case of disagreement
+ StringBuffer sb = new StringBuffer();
+ sb.append("Disagreement: " + url + "; ");
+ for (int i = 0; i < newClues.size(); i++) {
+ if (i > 0) {
+ sb.append(", ");
+ }
+ sb.append(newClues.get(i));
+ }
+ LOG.trace(sb.toString());
+ }
+ }
+
+ public static String resolveEncodingAlias(String encoding) {
+ try {
+ if (encoding == null || !Charset.isSupported(encoding))
+ return null;
+ String canonicalName = new String(Charset.forName(encoding).name());
+ return ALIASES.containsKey(canonicalName) ? ALIASES.get(canonicalName)
+ : canonicalName;
+ } catch (Exception e) {
+ LOG.warn("Invalid encoding " + encoding + " detected, using default.");
+ return null;
+ }
+ }
+
+ /**
+ * Parse the character encoding from the specified content type header. If the
+ * content type is null, or there is no explicit character encoding,
+ * <code>null</code> is returned. <br />
+ * This method was copied from org.apache.catalina.util.RequestUtil, which is
+ * licensed under the Apache License, Version 2.0 (the "License").
+ *
+ * @param contentType
+ * a content type header
+ */
+ public static String parseCharacterEncoding(String contentType) {
+ if (contentType == null)
+ return (null);
+ int start = contentType.indexOf("charset=");
+ if (start < 0)
+ return (null);
+ String encoding = contentType.substring(start + 8);
+ int end = encoding.indexOf(';');
+ if (end >= 0)
+ encoding = encoding.substring(0, end);
+ encoding = encoding.trim();
+ if ((encoding.length() > 2) && (encoding.startsWith("\""))
+ && (encoding.endsWith("\"")))
+ encoding = encoding.substring(1, encoding.length() - 1);
+ return (encoding.trim());
+
+ }
+
+ public static void main(String[] args) throws IOException {
+ if (args.length != 1) {
+ System.err.println("Usage: EncodingDetector <file>");
+ System.exit(1);
+ }
+
+ Configuration conf = NutchConfiguration.create();
+ EncodingDetector detector = new EncodingDetector(
+ NutchConfiguration.create());
+
+ // do everything as bytes; don't want any conversion
+ BufferedInputStream istr = new BufferedInputStream(new FileInputStream(
+ args[0]));
+ ByteArrayOutputStream ostr = new ByteArrayOutputStream();
+ byte[] bytes = new byte[1000];
+ boolean more = true;
+ while (more) {
+ int len = istr.read(bytes);
+ if (len < bytes.length) {
+ more = false;
+ if (len > 0) {
+ ostr.write(bytes, 0, len);
+ }
+ } else {
+ ostr.write(bytes);
+ }
+ }
+
+ byte[] data = ostr.toByteArray();
+
+ // make a fake Content
+ Content content = new Content("", "", data, "text/html", new Metadata(),
+ conf);
+
+ detector.autoDetectClues(content, true);
+ String encoding = detector.guessEncoding(content,
+ conf.get("parser.character.encoding.default"));
+ System.out.println("Guessed encoding: " + encoding);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java
new file mode 100644
index 0000000..6aed8d5
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/FSUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+
+/**
+ * Utility methods for common filesystem operations.
+ */
+public class FSUtils {
+
+ /**
+ * Replaces the current path with the new path and if set removes the old
+ * path. If removeOld is set to false then the old path will be set to the
+ * name current.old.
+ *
+ * @param fs
+ * The FileSystem.
+ * @param current
+ * The end path, the one being replaced.
+ * @param replacement
+ * The path to replace with.
+ * @param removeOld
+ * True if we are removing the current path.
+ *
+ * @throws IOException
+ * If an error occurs during replacement.
+ */
+ public static void replace(FileSystem fs, Path current, Path replacement,
+ boolean removeOld) throws IOException {
+
+ // rename any current path to old
+ Path old = new Path(current + ".old");
+ if (fs.exists(current)) {
+ fs.rename(current, old);
+ }
+
+ // rename the new path to current and remove the old path if needed
+ fs.rename(replacement, current);
+ if (fs.exists(old) && removeOld) {
+ fs.delete(old, true);
+ }
+ }
+
+ /**
+ * Closes a group of SequenceFile readers.
+ *
+ * @param readers
+ * The SequenceFile readers to close.
+ * @throws IOException
+ * If an error occurs while closing a reader.
+ */
+ public static void closeReaders(SequenceFile.Reader[] readers)
+ throws IOException {
+
+ // loop through the readers, closing one by one
+ if (readers != null) {
+ for (int i = 0; i < readers.length; i++) {
+ SequenceFile.Reader reader = readers[i];
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+
+ /**
+ * Closes a group of MapFile readers.
+ *
+ * @param readers
+ * The MapFile readers to close.
+ * @throws IOException
+ * If an error occurs while closing a reader.
+ */
+ public static void closeReaders(MapFile.Reader[] readers) throws IOException {
+
+ // loop through the readers closing one by one
+ if (readers != null) {
+ for (int i = 0; i < readers.length; i++) {
+ MapFile.Reader reader = readers[i];
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java b/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java
new file mode 100644
index 0000000..63b10e2
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/GZIPUtils.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+// Slf4j Logging imports
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of utility methods for working on GZIPed data.
+ */
+public class GZIPUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(GZIPUtils.class);
+ private static final int EXPECTED_COMPRESSION_RATIO = 5;
+ private static final int BUF_SIZE = 4096;
+
+ /**
+ * Returns an gunzipped copy of the input array. If the gzipped input has been
+ * truncated or corrupted, a best-effort attempt is made to unzip as much as
+ * possible. If no data can be extracted <code>null</code> is returned.
+ */
+ public static final byte[] unzipBestEffort(byte[] in) {
+ return unzipBestEffort(in, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Returns an gunzipped copy of the input array, truncated to
+ * <code>sizeLimit</code> bytes, if necessary. If the gzipped input has been
+ * truncated or corrupted, a best-effort attempt is made to unzip as much as
+ * possible. If no data can be extracted <code>null</code> is returned.
+ */
+ public static final byte[] unzipBestEffort(byte[] in, int sizeLimit) {
+ try {
+ // decompress using GZIPInputStream
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+ EXPECTED_COMPRESSION_RATIO * in.length);
+
+ GZIPInputStream inStream = new GZIPInputStream(new ByteArrayInputStream(
+ in));
+
+ byte[] buf = new byte[BUF_SIZE];
+ int written = 0;
+ while (true) {
+ try {
+ int size = inStream.read(buf);
+ if (size <= 0)
+ break;
+ if ((written + size) > sizeLimit) {
+ outStream.write(buf, 0, sizeLimit - written);
+ break;
+ }
+ outStream.write(buf, 0, size);
+ written += size;
+ } catch (Exception e) {
+ break;
+ }
+ }
+ try {
+ outStream.close();
+ } catch (IOException e) {
+ }
+
+ return outStream.toByteArray();
+
+ } catch (IOException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Returns an gunzipped copy of the input array.
+ *
+ * @throws IOException
+ * if the input cannot be properly decompressed
+ */
+ public static final byte[] unzip(byte[] in) throws IOException {
+ // decompress using GZIPInputStream
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream(
+ EXPECTED_COMPRESSION_RATIO * in.length);
+
+ GZIPInputStream inStream = new GZIPInputStream(new ByteArrayInputStream(in));
+
+ byte[] buf = new byte[BUF_SIZE];
+ while (true) {
+ int size = inStream.read(buf);
+ if (size <= 0)
+ break;
+ outStream.write(buf, 0, size);
+ }
+ outStream.close();
+
+ return outStream.toByteArray();
+ }
+
+ /**
+ * Returns an gzipped copy of the input array.
+ */
+ public static final byte[] zip(byte[] in) {
+ try {
+ // compress using GZIPOutputStream
+ ByteArrayOutputStream byteOut = new ByteArrayOutputStream(in.length
+ / EXPECTED_COMPRESSION_RATIO);
+
+ GZIPOutputStream outStream = new GZIPOutputStream(byteOut);
+
+ try {
+ outStream.write(in);
+ } catch (Exception e) {
+ LOG.error("Error writing outStream: ", e);
+ }
+
+ try {
+ outStream.close();
+ } catch (IOException e) {
+ LOG.error("Error closing outStream: ", e);
+ }
+
+ return byteOut.toByteArray();
+
+ } catch (IOException e) {
+ LOG.error("Error: ", e);
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java b/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java
new file mode 100644
index 0000000..755aad0
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/GenericWritableConfigurable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.GenericWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A generic Writable wrapper that can inject Configuration to
+ * {@link Configurable}s
+ */
+public abstract class GenericWritableConfigurable extends GenericWritable
+ implements Configurable {
+
+ private Configuration conf;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte type = in.readByte();
+ Class<?> clazz = getTypes()[type];
+ try {
+ set((Writable) clazz.newInstance());
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new IOException("Cannot initialize the class: " + clazz);
+ }
+ Writable w = get();
+ if (w instanceof Configurable)
+ ((Configurable) w).setConf(conf);
+ w.readFields(in);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java
new file mode 100644
index 0000000..6f471c1
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/HadoopFSUtil.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+public class HadoopFSUtil {
+
+ /**
+ * Returns PathFilter that passes all paths through.
+ */
+ public static PathFilter getPassAllFilter() {
+ return new PathFilter() {
+ public boolean accept(Path arg0) {
+ return true;
+ }
+ };
+ }
+
+ /**
+ * Returns PathFilter that passes directories through.
+ */
+ public static PathFilter getPassDirectoriesFilter(final FileSystem fs) {
+ return new PathFilter() {
+ public boolean accept(final Path path) {
+ try {
+ return fs.getFileStatus(path).isDirectory();
+ } catch (IOException ioe) {
+ return false;
+ }
+ }
+
+ };
+ }
+
+ /**
+ * Turns an array of FileStatus into an array of Paths.
+ */
+ public static Path[] getPaths(FileStatus[] stats) {
+ if (stats == null) {
+ return null;
+ }
+ if (stats.length == 0) {
+ return new Path[0];
+ }
+ Path[] res = new Path[stats.length];
+ for (int i = 0; i < stats.length; i++) {
+ res[i] = stats[i].getPath();
+ }
+ return res;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java
new file mode 100644
index 0000000..656a458
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/JexlUtil.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.util;
+
+import java.util.Date;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.jexl2.Expression;
+import org.apache.commons.jexl2.JexlEngine;
+import org.apache.commons.lang.time.DateUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection of Jexl utilit(y|ies).
+ */
+public class JexlUtil {
+
+ public static final Logger LOG = LoggerFactory.getLogger(JexlUtil.class);
+
+ /**
+ *
+ */
+ public static Pattern datePattern = Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}Z");
+
+ /**
+ * Parses the given experssion to a Jexl expression. This supports
+ * date parsing.
+ *
+ * @param expr the Jexl expression
+ * @return parsed Jexl expression or null in case of parse error
+ */
+ public static Expression parseExpression(String expr) {
+ if (expr == null) return null;
+
+ try {
+ // Translate any date object into a long, dates must be specified as 20-03-2016T00:00:00Z
+ Matcher matcher = datePattern.matcher(expr);
+ if (matcher.find()) {
+ String date = matcher.group();
+
+ // Parse the thing and get epoch!
+ Date parsedDate = DateUtils.parseDateStrictly(date, new String[] {"yyyy-MM-dd'T'HH:mm:ss'Z'"});
+ long time = parsedDate.getTime();
+
+ // Replace in the original expression
+ expr = expr.replace(date, Long.toString(time));
+ }
+
+ JexlEngine jexl = new JexlEngine();
+ jexl.setSilent(true);
+ jexl.setStrict(true);
+ return jexl.createExpression(expr);
+ } catch (Exception e) {
+ LOG.error(e.getMessage());
+ }
+
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java b/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java
new file mode 100644
index 0000000..7e3bb97
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/util/LockUtil.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Utility methods for handling application-level locking.
+ *
+ * @author Andrzej Bialecki
+ */
+public class LockUtil {
+
+ /**
+ * Create a lock file.
+ *
+ * @param fs
+ * filesystem
+ * @param lockFile
+ * name of the lock file
+ * @param accept
+ * if true, and the target file exists, consider it valid. If false
+ * and the target file exists, throw an IOException.
+ * @throws IOException
+ * if accept is false, and the target file already exists, or if
+ * it's a directory.
+ */
+ public static void createLockFile(FileSystem fs, Path lockFile, boolean accept)
+ throws IOException {
+ if (fs.exists(lockFile)) {
+ if (!accept)
+ throw new IOException("lock file " + lockFile + " already exists.");
+ if (fs.getFileStatus(lockFile).isDirectory())
+ throw new IOException("lock file " + lockFile
+ + " already exists and is a directory.");
+ // do nothing - the file already exists.
+ } else {
+ // make sure parents exist
+ fs.mkdirs(lockFile.getParent());
+ fs.createNewFile(lockFile);
+ }
+ }
+
+ /**
+ * Remove lock file. NOTE: applications enforce the semantics of this file -
+ * this method simply removes any file with a given name.
+ *
+ * @param fs
+ * filesystem
+ * @param lockFile
+ * lock file name
+ * @return false, if the lock file doesn't exist. True, if it existed and was
+ * successfully removed.
+ * @throws IOException
+ * if lock file exists but it is a directory.
+ */
+ public static boolean removeLockFile(FileSystem fs, Path lockFile)
+ throws IOException {
+ if (!fs.exists(lockFile))
+ return false;
+ if (fs.getFileStatus(lockFile).isDirectory())
+ throw new IOException("lock file " + lockFile
+ + " exists but is a directory!");
+ return fs.delete(lockFile, false);
+ }
+}