You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:54 UTC
[38/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build
for nutch-core and nutch-plugins
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java
new file mode 100644
index 0000000..ef12f52
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java
@@ -0,0 +1,793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.MapFile.Writer.Option;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.metadata.MetaWrapper;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * This tool takes several segments and merges their data together. Only the
+ * latest versions of data is retained.
+ * <p>
+ * Optionally, you can apply current URLFilters to remove prohibited URL-s.
+ * </p>
+ * <p>
+ * Also, it's possible to slice the resulting segment into chunks of fixed size.
+ * </p>
+ * <h3>Important Notes</h3> <h4>Which parts are merged?</h4>
+ * <p>
+ * It doesn't make sense to merge data from segments, which are at different
+ * stages of processing (e.g. one unfetched segment, one fetched but not parsed,
+ * and one fetched and parsed). Therefore, prior to merging, the tool will
+ * determine the lowest common set of input data, and only this data will be
+ * merged. This may have some unintended consequences: e.g. if majority of input
+ * segments are fetched and parsed, but one of them is unfetched, the tool will
+ * fall back to just merging fetchlists, and it will skip all other data from
+ * all segments.
+ * </p>
+ * <h4>Merging fetchlists</h4>
+ * <p>
+ * Merging segments, which contain just fetchlists (i.e. prior to fetching) is
+ * not recommended, because this tool (unlike the
+ * {@link org.apache.nutch.crawl.Generator} doesn't ensure that fetchlist parts
+ * for each map task are disjoint.
+ * </p>
+ * <p>
+ * <h4>Duplicate content</h4>
+ * Merging segments removes older content whenever possible (see below).
+ * However, this is NOT the same as de-duplication, which in addition removes
+ * identical content found at different URL-s. In other words, running
+ * DeleteDuplicates is still necessary.
+ * </p>
+ * <p>
+ * For some types of data (especially ParseText) it's not possible to determine
+ * which version is really older. Therefore the tool always uses segment names
+ * as timestamps, for all types of input data. Segment names are compared in
+ * forward lexicographic order (0-9a-zA-Z), and data from segments with "higher"
+ * names will prevail. It follows then that it is extremely important that
+ * segments be named in an increasing lexicographic order as their creation time
+ * increases.
+ * </p>
+ * <p>
+ * <h4>Merging and indexes</h4>
+ * Merged segment gets a different name. Since Indexer embeds segment names in
+ * indexes, any indexes originally created for the input segments will NOT work
+ * with the merged segment. Newly created merged segment(s) need to be indexed
+ * afresh. This tool doesn't use existing indexes in any way, so if you plan to
+ * merge segments you don't have to index them prior to merging.
+ *
+ *
+ * @author Andrzej Bialecki
+ */
+public class SegmentMerger extends Configured implements Tool,
+ Mapper<Text, MetaWrapper, Text, MetaWrapper>,
+ Reducer<Text, MetaWrapper, Text, MetaWrapper> {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SegmentMerger.class);
+
+ private static final String SEGMENT_PART_KEY = "part";
+ private static final String SEGMENT_SLICE_KEY = "slice";
+
+ private URLFilters filters = null;
+ private URLNormalizers normalizers = null;
+ private SegmentMergeFilters mergeFilters = null;
+ private long sliceSize = -1;
+ private long curCount = 0;
+
+ /**
+ * Wraps inputs in an {@link MetaWrapper}, to permit merging different types
+ * in reduce and use additional metadata.
+ */
+ public static class ObjectInputFormat extends
+ SequenceFileInputFormat<Text, MetaWrapper> {
+
+ @Override
+ public RecordReader<Text, MetaWrapper> getRecordReader(
+ final InputSplit split, final JobConf job, Reporter reporter)
+ throws IOException {
+
+ reporter.setStatus(split.toString());
+
+ // find part name
+ SegmentPart segmentPart;
+ final String spString;
+ final FileSplit fSplit = (FileSplit) split;
+ try {
+ segmentPart = SegmentPart.get(fSplit);
+ spString = segmentPart.toString();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot identify segment:", e);
+ }
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(fSplit.getPath()));
+
+ final Writable w;
+ try {
+ w = (Writable) reader.getValueClass().newInstance();
+ } catch (Exception e) {
+ throw new IOException(e.toString());
+ } finally {
+ try {
+ reader.close();
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ final SequenceFileRecordReader<Text, Writable> splitReader = new SequenceFileRecordReader<Text, Writable>(
+ job, (FileSplit) split);
+
+ try {
+ return new SequenceFileRecordReader<Text, MetaWrapper>(job, fSplit) {
+
+ public synchronized boolean next(Text key, MetaWrapper wrapper)
+ throws IOException {
+ LOG.debug("Running OIF.next()");
+
+ boolean res = splitReader.next(key, w);
+ wrapper.set(w);
+ wrapper.setMeta(SEGMENT_PART_KEY, spString);
+ return res;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ splitReader.close();
+ }
+
+ @Override
+ public MetaWrapper createValue() {
+ return new MetaWrapper();
+ }
+
+ };
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot create RecordReader: ", e);
+ }
+ }
+ }
+
+ public static class SegmentOutputFormat extends
+ FileOutputFormat<Text, MetaWrapper> {
+ private static final String DEFAULT_SLICE = "default";
+
+ @Override
+ public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem fs,
+ final JobConf job, final String name, final Progressable progress)
+ throws IOException {
+ return new RecordWriter<Text, MetaWrapper>() {
+ MapFile.Writer c_out = null;
+ MapFile.Writer f_out = null;
+ MapFile.Writer pd_out = null;
+ MapFile.Writer pt_out = null;
+ SequenceFile.Writer g_out = null;
+ SequenceFile.Writer p_out = null;
+ HashMap<String, Closeable> sliceWriters = new HashMap<String, Closeable>();
+ String segmentName = job.get("segment.merger.segmentName");
+
+ public void write(Text key, MetaWrapper wrapper) throws IOException {
+ // unwrap
+ SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY));
+ Writable o = wrapper.get();
+ String slice = wrapper.getMeta(SEGMENT_SLICE_KEY);
+ if (o instanceof CrawlDatum) {
+ if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+ g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
+ g_out.append(key, o);
+ } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
+ f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
+ CrawlDatum.class);
+ f_out.append(key, o);
+ } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
+ p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
+ p_out.append(key, o);
+ } else {
+ throw new IOException("Cannot determine segment part: "
+ + sp.partName);
+ }
+ } else if (o instanceof Content) {
+ c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
+ c_out.append(key, o);
+ } else if (o instanceof ParseData) {
+ // update the segment name inside contentMeta - required by Indexer
+ if (slice == null) {
+ ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
+ segmentName);
+ } else {
+ ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
+ segmentName + "-" + slice);
+ }
+ pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
+ pd_out.append(key, o);
+ } else if (o instanceof ParseText) {
+ pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
+ pt_out.append(key, o);
+ }
+ }
+
+ // lazily create SequenceFile-s.
+ private SequenceFile.Writer ensureSequenceFile(String slice,
+ String dirName) throws IOException {
+ if (slice == null)
+ slice = DEFAULT_SLICE;
+ SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters
+ .get(slice + dirName);
+ if (res != null)
+ return res;
+ Path wname;
+ Path out = FileOutputFormat.getOutputPath(job);
+ if (slice == DEFAULT_SLICE) {
+ wname = new Path(new Path(new Path(out, segmentName), dirName),
+ name);
+ } else {
+ wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
+ dirName), name);
+ }
+
+// Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class);
+// org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
+// Option rProgressOpt = (Option) SequenceFile.Writer.progressable(progress);
+// Option rCompOpt = (Option) SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job));
+// Option rFileOpt = (Option) SequenceFile.Writer.file(wname);
+
+ //res = SequenceFile.createWriter(job, rFileOpt, rKeyClassOpt,
+ // rValClassOpt, rCompOpt, rProgressOpt);
+
+ res = SequenceFile.createWriter(job, SequenceFile.Writer.file(wname),
+ SequenceFile.Writer.keyClass(Text.class),
+ SequenceFile.Writer.valueClass(CrawlDatum.class),
+ SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
+ SequenceFile.Writer.replication(fs.getDefaultReplication(wname)),
+ SequenceFile.Writer.blockSize(1073741824),
+ SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job), new DefaultCodec()),
+ SequenceFile.Writer.progressable(progress),
+ SequenceFile.Writer.metadata(new Metadata()));
+
+ sliceWriters.put(slice + dirName, res);
+ return res;
+ }
+
+ // lazily create MapFile-s.
+ private MapFile.Writer ensureMapFile(String slice, String dirName,
+ Class<? extends Writable> clazz) throws IOException {
+ if (slice == null)
+ slice = DEFAULT_SLICE;
+ MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice
+ + dirName);
+ if (res != null)
+ return res;
+ Path wname;
+ Path out = FileOutputFormat.getOutputPath(job);
+ if (slice == DEFAULT_SLICE) {
+ wname = new Path(new Path(new Path(out, segmentName), dirName),
+ name);
+ } else {
+ wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
+ dirName), name);
+ }
+ CompressionType compType = SequenceFileOutputFormat
+ .getOutputCompressionType(job);
+ if (clazz.isAssignableFrom(ParseText.class)) {
+ compType = CompressionType.RECORD;
+ }
+
+ Option rKeyClassOpt = (Option) MapFile.Writer.keyClass(Text.class);
+ org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(clazz);
+ org.apache.hadoop.io.SequenceFile.Writer.Option rProgressOpt = SequenceFile.Writer.progressable(progress);
+ org.apache.hadoop.io.SequenceFile.Writer.Option rCompOpt = SequenceFile.Writer.compression(compType);
+
+ res = new MapFile.Writer(job, wname, rKeyClassOpt,
+ rValClassOpt, rCompOpt, rProgressOpt);
+ sliceWriters.put(slice + dirName, res);
+ return res;
+ }
+
+ public void close(Reporter reporter) throws IOException {
+ Iterator<Closeable> it = sliceWriters.values().iterator();
+ while (it.hasNext()) {
+ Object o = it.next();
+ if (o instanceof SequenceFile.Writer) {
+ ((SequenceFile.Writer) o).close();
+ } else {
+ ((MapFile.Writer) o).close();
+ }
+ }
+ }
+ };
+ }
+ }
+
+ public SegmentMerger() {
+ super(null);
+ }
+
+ public SegmentMerger(Configuration conf) {
+ super(conf);
+ }
+
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ if (conf == null)
+ return;
+ if (conf.getBoolean("segment.merger.filter", false)) {
+ filters = new URLFilters(conf);
+ mergeFilters = new SegmentMergeFilters(conf);
+ }
+ if (conf.getBoolean("segment.merger.normalizer", false))
+ normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+ sliceSize = conf.getLong("segment.merger.slice", -1);
+ if ((sliceSize > 0) && (LOG.isInfoEnabled())) {
+ LOG.info("Slice size: " + sliceSize + " URLs.");
+ }
+ }
+
+ public void close() throws IOException {
+ }
+
+ public void configure(JobConf conf) {
+ setConf(conf);
+ if (sliceSize > 0) {
+ sliceSize = sliceSize / conf.getNumReduceTasks();
+ }
+ }
+
+ private Text newKey = new Text();
+
+ public void map(Text key, MetaWrapper value,
+ OutputCollector<Text, MetaWrapper> output, Reporter reporter)
+ throws IOException {
+ String url = key.toString();
+ if (normalizers != null) {
+ try {
+ url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // normalize
+ // the
+ // url
+ } catch (Exception e) {
+ LOG.warn("Skipping " + url + ":" + e.getMessage());
+ url = null;
+ }
+ }
+ if (url != null && filters != null) {
+ try {
+ url = filters.filter(url);
+ } catch (Exception e) {
+ LOG.warn("Skipping key " + url + ": " + e.getMessage());
+ url = null;
+ }
+ }
+ if (url != null) {
+ newKey.set(url);
+ output.collect(newKey, value);
+ }
+ }
+
+ /**
+ * NOTE: in selecting the latest version we rely exclusively on the segment
+ * name (not all segment data contain time information). Therefore it is
+ * extremely important that segments be named in an increasing lexicographic
+ * order as their creation time increases.
+ */
+ public void reduce(Text key, Iterator<MetaWrapper> values,
+ OutputCollector<Text, MetaWrapper> output, Reporter reporter)
+ throws IOException {
+ CrawlDatum lastG = null;
+ CrawlDatum lastF = null;
+ CrawlDatum lastSig = null;
+ Content lastC = null;
+ ParseData lastPD = null;
+ ParseText lastPT = null;
+ String lastGname = null;
+ String lastFname = null;
+ String lastSigname = null;
+ String lastCname = null;
+ String lastPDname = null;
+ String lastPTname = null;
+ TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<String, ArrayList<CrawlDatum>>();
+ while (values.hasNext()) {
+ MetaWrapper wrapper = values.next();
+ Object o = wrapper.get();
+ String spString = wrapper.getMeta(SEGMENT_PART_KEY);
+ if (spString == null) {
+ throw new IOException("Null segment part, key=" + key);
+ }
+ SegmentPart sp = SegmentPart.parse(spString);
+ if (o instanceof CrawlDatum) {
+ CrawlDatum val = (CrawlDatum) o;
+ // check which output dir it belongs to
+ if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+ if (lastG == null) {
+ lastG = val;
+ lastGname = sp.segmentName;
+ } else {
+ // take newer
+ if (lastGname.compareTo(sp.segmentName) < 0) {
+ lastG = val;
+ lastGname = sp.segmentName;
+ }
+ }
+ } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
+ // only consider fetch status and ignore fetch retry status
+ // https://issues.apache.org/jira/browse/NUTCH-1520
+ // https://issues.apache.org/jira/browse/NUTCH-1113
+ if (CrawlDatum.hasFetchStatus(val)
+ && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
+ && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+ if (lastF == null) {
+ lastF = val;
+ lastFname = sp.segmentName;
+ } else {
+ if (lastFname.compareTo(sp.segmentName) < 0) {
+ lastF = val;
+ lastFname = sp.segmentName;
+ }
+ }
+ }
+ } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
+ if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {
+ if (lastSig == null) {
+ lastSig = val;
+ lastSigname = sp.segmentName;
+ } else {
+ // take newer
+ if (lastSigname.compareTo(sp.segmentName) < 0) {
+ lastSig = val;
+ lastSigname = sp.segmentName;
+ }
+ }
+ continue;
+ }
+ // collect all LINKED values from the latest segment
+ ArrayList<CrawlDatum> segLinked = linked.get(sp.segmentName);
+ if (segLinked == null) {
+ segLinked = new ArrayList<CrawlDatum>();
+ linked.put(sp.segmentName, segLinked);
+ }
+ segLinked.add(val);
+ } else {
+ throw new IOException("Cannot determine segment part: " + sp.partName);
+ }
+ } else if (o instanceof Content) {
+ if (lastC == null) {
+ lastC = (Content) o;
+ lastCname = sp.segmentName;
+ } else {
+ if (lastCname.compareTo(sp.segmentName) < 0) {
+ lastC = (Content) o;
+ lastCname = sp.segmentName;
+ }
+ }
+ } else if (o instanceof ParseData) {
+ if (lastPD == null) {
+ lastPD = (ParseData) o;
+ lastPDname = sp.segmentName;
+ } else {
+ if (lastPDname.compareTo(sp.segmentName) < 0) {
+ lastPD = (ParseData) o;
+ lastPDname = sp.segmentName;
+ }
+ }
+ } else if (o instanceof ParseText) {
+ if (lastPT == null) {
+ lastPT = (ParseText) o;
+ lastPTname = sp.segmentName;
+ } else {
+ if (lastPTname.compareTo(sp.segmentName) < 0) {
+ lastPT = (ParseText) o;
+ lastPTname = sp.segmentName;
+ }
+ }
+ }
+ }
+ // perform filtering based on full merge record
+ if (mergeFilters != null
+ && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD,
+ lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) {
+ return;
+ }
+
+ curCount++;
+ String sliceName = null;
+ MetaWrapper wrapper = new MetaWrapper();
+ if (sliceSize > 0) {
+ sliceName = String.valueOf(curCount / sliceSize);
+ wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName);
+ }
+ SegmentPart sp = new SegmentPart();
+ // now output the latest values
+ if (lastG != null) {
+ wrapper.set(lastG);
+ sp.partName = CrawlDatum.GENERATE_DIR_NAME;
+ sp.segmentName = lastGname;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ output.collect(key, wrapper);
+ }
+ if (lastF != null) {
+ wrapper.set(lastF);
+ sp.partName = CrawlDatum.FETCH_DIR_NAME;
+ sp.segmentName = lastFname;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ output.collect(key, wrapper);
+ }
+ if (lastSig != null) {
+ wrapper.set(lastSig);
+ sp.partName = CrawlDatum.PARSE_DIR_NAME;
+ sp.segmentName = lastSigname;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ output.collect(key, wrapper);
+ }
+ if (lastC != null) {
+ wrapper.set(lastC);
+ sp.partName = Content.DIR_NAME;
+ sp.segmentName = lastCname;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ output.collect(key, wrapper);
+ }
+ if (lastPD != null) {
+ wrapper.set(lastPD);
+ sp.partName = ParseData.DIR_NAME;
+ sp.segmentName = lastPDname;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ output.collect(key, wrapper);
+ }
+ if (lastPT != null) {
+ wrapper.set(lastPT);
+ sp.partName = ParseText.DIR_NAME;
+ sp.segmentName = lastPTname;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ output.collect(key, wrapper);
+ }
+ if (linked.size() > 0) {
+ String name = linked.lastKey();
+ sp.partName = CrawlDatum.PARSE_DIR_NAME;
+ sp.segmentName = name;
+ wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+ ArrayList<CrawlDatum> segLinked = linked.get(name);
+ for (int i = 0; i < segLinked.size(); i++) {
+ CrawlDatum link = segLinked.get(i);
+ wrapper.set(link);
+ output.collect(key, wrapper);
+ }
+ }
+ }
+
+ public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
+ long slice) throws Exception {
+ String segmentName = Generator.generateSegmentName();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Merging " + segs.length + " segments to " + out + "/"
+ + segmentName);
+ }
+ JobConf job = new NutchJob(getConf());
+ job.setJobName("mergesegs " + out + "/" + segmentName);
+ job.setBoolean("segment.merger.filter", filter);
+ job.setBoolean("segment.merger.normalizer", normalize);
+ job.setLong("segment.merger.slice", slice);
+ job.set("segment.merger.segmentName", segmentName);
+ FileSystem fs = FileSystem.get(getConf());
+ // prepare the minimal common set of input dirs
+ boolean g = true;
+ boolean f = true;
+ boolean p = true;
+ boolean c = true;
+ boolean pd = true;
+ boolean pt = true;
+
+ // These contain previous values, we use it to track changes in the loop
+ boolean pg = true;
+ boolean pf = true;
+ boolean pp = true;
+ boolean pc = true;
+ boolean ppd = true;
+ boolean ppt = true;
+ for (int i = 0; i < segs.length; i++) {
+ if (!fs.exists(segs[i])) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Input dir " + segs[i] + " doesn't exist, skipping.");
+ }
+ segs[i] = null;
+ continue;
+ }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("SegmentMerger: adding " + segs[i]);
+ }
+ Path cDir = new Path(segs[i], Content.DIR_NAME);
+ Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+ Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+ Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+ Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+ Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
+ c = c && fs.exists(cDir);
+ g = g && fs.exists(gDir);
+ f = f && fs.exists(fDir);
+ p = p && fs.exists(pDir);
+ pd = pd && fs.exists(pdDir);
+ pt = pt && fs.exists(ptDir);
+
+ // Input changed?
+ if (g != pg || f != pf || p != pp || c != pc || pd != ppd || pt != ppt) {
+ LOG.info(segs[i] + " changed input dirs");
+ }
+
+ pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt;
+ }
+ StringBuffer sb = new StringBuffer();
+ if (c)
+ sb.append(" " + Content.DIR_NAME);
+ if (g)
+ sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
+ if (f)
+ sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
+ if (p)
+ sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
+ if (pd)
+ sb.append(" " + ParseData.DIR_NAME);
+ if (pt)
+ sb.append(" " + ParseText.DIR_NAME);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("SegmentMerger: using segment data from:" + sb.toString());
+ }
+ for (int i = 0; i < segs.length; i++) {
+ if (segs[i] == null)
+ continue;
+ if (g) {
+ Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+ FileInputFormat.addInputPath(job, gDir);
+ }
+ if (c) {
+ Path cDir = new Path(segs[i], Content.DIR_NAME);
+ FileInputFormat.addInputPath(job, cDir);
+ }
+ if (f) {
+ Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+ FileInputFormat.addInputPath(job, fDir);
+ }
+ if (p) {
+ Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+ FileInputFormat.addInputPath(job, pDir);
+ }
+ if (pd) {
+ Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+ FileInputFormat.addInputPath(job, pdDir);
+ }
+ if (pt) {
+ Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
+ FileInputFormat.addInputPath(job, ptDir);
+ }
+ }
+ job.setInputFormat(ObjectInputFormat.class);
+ job.setMapperClass(SegmentMerger.class);
+ job.setReducerClass(SegmentMerger.class);
+ FileOutputFormat.setOutputPath(job, out);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(MetaWrapper.class);
+ job.setOutputFormat(SegmentOutputFormat.class);
+
+ setConf(job);
+
+ JobClient.runJob(job);
+ }
+
+ /**
+ * @param args
+ */
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err
+ .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]");
+ System.err
+ .println("\toutput_dir\tname of the parent dir for output segment slice(s)");
+ System.err
+ .println("\t-dir segments\tparent dir containing several segments");
+ System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
+ System.err
+ .println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters");
+ System.err
+ .println("\t-normalize\t\tnormalize URL via current URLNormalizers");
+ System.err
+ .println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs");
+ return -1;
+ }
+ Configuration conf = NutchConfiguration.create();
+ final FileSystem fs = FileSystem.get(conf);
+ Path out = new Path(args[0]);
+ ArrayList<Path> segs = new ArrayList<Path>();
+ long sliceSize = 0;
+ boolean filter = false;
+ boolean normalize = false;
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-dir")) {
+ FileStatus[] fstats = fs.listStatus(new Path(args[++i]),
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] files = HadoopFSUtil.getPaths(fstats);
+ for (int j = 0; j < files.length; j++)
+ segs.add(files[j]);
+ } else if (args[i].equals("-filter")) {
+ filter = true;
+ } else if (args[i].equals("-normalize")) {
+ normalize = true;
+ } else if (args[i].equals("-slice")) {
+ sliceSize = Long.parseLong(args[++i]);
+ } else {
+ segs.add(new Path(args[i]));
+ }
+ }
+ if (segs.size() == 0) {
+ System.err.println("ERROR: No input segments.");
+ return -1;
+ }
+
+ merge(out, segs.toArray(new Path[segs.size()]), filter, normalize,
+ sliceSize);
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(NutchConfiguration.create(),
+ new SegmentMerger(), args);
+ System.exit(result);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java
new file mode 100644
index 0000000..84247e4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Utility class for handling information about segment parts.
+ *
+ * @author Andrzej Bialecki
+ */
+public class SegmentPart {
+ /** Name of the segment (just the last path component). */
+ public String segmentName;
+ /** Name of the segment part (ie. one of subdirectories inside a segment). */
+ public String partName;
+
+ public SegmentPart() {
+
+ }
+
+ public SegmentPart(String segmentName, String partName) {
+ this.segmentName = segmentName;
+ this.partName = partName;
+ }
+
+ /**
+ * Return a String representation of this class, in the form
+ * "segmentName/partName".
+ */
+ public String toString() {
+ return segmentName + "/" + partName;
+ }
+
+ /**
+ * Create SegmentPart from a FileSplit.
+ *
+ * @param split
+ * @return A {@link SegmentPart} resultant from a {@link FileSplit}.
+ * @throws Exception
+ */
+ public static SegmentPart get(FileSplit split) throws IOException {
+ return get(split.getPath().toString());
+ }
+
+ /**
+ * Create SegmentPart from a full path of a location inside any segment part.
+ *
+ * @param path
+ * full path into a segment part (may include "part-xxxxx"
+ * components)
+ * @return SegmentPart instance describing this part.
+ * @throws IOException
+ * if any required path components are missing.
+ */
+ public static SegmentPart get(String path) throws IOException {
+ // find part name
+ String dir = path.replace('\\', '/');
+ int idx = dir.lastIndexOf("/part-");
+ if (idx == -1) {
+ throw new IOException("Cannot determine segment part: " + dir);
+ }
+ dir = dir.substring(0, idx);
+ idx = dir.lastIndexOf('/');
+ if (idx == -1) {
+ throw new IOException("Cannot determine segment part: " + dir);
+ }
+ String part = dir.substring(idx + 1);
+ // find segment name
+ dir = dir.substring(0, idx);
+ idx = dir.lastIndexOf('/');
+ if (idx == -1) {
+ throw new IOException("Cannot determine segment name: " + dir);
+ }
+ String segment = dir.substring(idx + 1);
+ return new SegmentPart(segment, part);
+ }
+
+ /**
+ * Create SegmentPart from a String in format "segmentName/partName".
+ *
+ * @param string
+ * input String
+ * @return parsed instance of SegmentPart
+ * @throws IOException
+ * if "/" is missing.
+ */
+ public static SegmentPart parse(String string) throws IOException {
+ int idx = string.indexOf('/');
+ if (idx == -1) {
+ throw new IOException("Invalid SegmentPart: '" + string + "'");
+ }
+ String segment = string.substring(0, idx);
+ String part = string.substring(idx + 1);
+ return new SegmentPart(segment, part);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java
new file mode 100644
index 0000000..d00d1e2
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/** Dump the content of a segment. */
+public class SegmentReader extends Configured implements
+ Reducer<Text, NutchWritable, Text, Text> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(SegmentReader.class);
+
+ long recNo = 0L;
+
+ private boolean co, fe, ge, pa, pd, pt;
+ private FileSystem fs;
+
+ public static class InputCompatMapper extends MapReduceBase implements
+ Mapper<WritableComparable<?>, Writable, Text, NutchWritable> {
+ private Text newKey = new Text();
+
+ public void map(WritableComparable<?> key, Writable value,
+ OutputCollector<Text, NutchWritable> collector, Reporter reporter)
+ throws IOException {
+ // convert on the fly from old formats with UTF8 keys.
+ // UTF8 deprecated and replaced by Text.
+ if (key instanceof Text) {
+ newKey.set(key.toString());
+ key = newKey;
+ }
+ collector.collect((Text) key, new NutchWritable(value));
+ }
+
+ }
+
+ /** Implements a text output format */
+ public static class TextOutputFormat extends
+ FileOutputFormat<WritableComparable<?>, Writable> {
+ public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
+ final FileSystem fs, JobConf job, String name,
+ final Progressable progress) throws IOException {
+
+ final Path segmentDumpFile = new Path(
+ FileOutputFormat.getOutputPath(job), name);
+
+ // Get the old copy out of the way
+ if (fs.exists(segmentDumpFile))
+ fs.delete(segmentDumpFile, true);
+
+ final PrintStream printStream = new PrintStream(
+ fs.create(segmentDumpFile));
+ return new RecordWriter<WritableComparable<?>, Writable>() {
+ public synchronized void write(WritableComparable<?> key, Writable value)
+ throws IOException {
+ printStream.println(value);
+ }
+
+ public synchronized void close(Reporter reporter) throws IOException {
+ printStream.close();
+ }
+ };
+ }
+ }
+
+ public SegmentReader() {
+ super(null);
+ }
+
+ public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge,
+ boolean pa, boolean pd, boolean pt) {
+ super(conf);
+ this.co = co;
+ this.fe = fe;
+ this.ge = ge;
+ this.pa = pa;
+ this.pd = pd;
+ this.pt = pt;
+ try {
+ this.fs = FileSystem.get(getConf());
+ } catch (IOException e) {
+ LOG.error("IOException:", e);
+ }
+ }
+
+ public void configure(JobConf job) {
+ setConf(job);
+ this.co = getConf().getBoolean("segment.reader.co", true);
+ this.fe = getConf().getBoolean("segment.reader.fe", true);
+ this.ge = getConf().getBoolean("segment.reader.ge", true);
+ this.pa = getConf().getBoolean("segment.reader.pa", true);
+ this.pd = getConf().getBoolean("segment.reader.pd", true);
+ this.pt = getConf().getBoolean("segment.reader.pt", true);
+ try {
+ this.fs = FileSystem.get(getConf());
+ } catch (IOException e) {
+ LOG.error("IOException:", e);
+ }
+ }
+
+ private JobConf createJobConf() {
+ JobConf job = new NutchJob(getConf());
+ job.setBoolean("segment.reader.co", this.co);
+ job.setBoolean("segment.reader.fe", this.fe);
+ job.setBoolean("segment.reader.ge", this.ge);
+ job.setBoolean("segment.reader.pa", this.pa);
+ job.setBoolean("segment.reader.pd", this.pd);
+ job.setBoolean("segment.reader.pt", this.pt);
+ return job;
+ }
+
+ public void close() {
+ }
+
+ public void reduce(Text key, Iterator<NutchWritable> values,
+ OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+ StringBuffer dump = new StringBuffer();
+
+ dump.append("\nRecno:: ").append(recNo++).append("\n");
+ dump.append("URL:: " + key.toString() + "\n");
+ while (values.hasNext()) {
+ Writable value = values.next().get(); // unwrap
+ if (value instanceof CrawlDatum) {
+ dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());
+ } else if (value instanceof Content) {
+ dump.append("\nContent::\n").append(((Content) value).toString());
+ } else if (value instanceof ParseData) {
+ dump.append("\nParseData::\n").append(((ParseData) value).toString());
+ } else if (value instanceof ParseText) {
+ dump.append("\nParseText::\n").append(((ParseText) value).toString());
+ } else if (LOG.isWarnEnabled()) {
+ LOG.warn("Unrecognized type: " + value.getClass());
+ }
+ }
+ output.collect(key, new Text(dump.toString()));
+ }
+
+ public void dump(Path segment, Path output) throws IOException {
+
+ if (LOG.isInfoEnabled()) {
+ LOG.info("SegmentReader: dump segment: " + segment);
+ }
+
+ JobConf job = createJobConf();
+ job.setJobName("read " + segment);
+
+ if (ge)
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.GENERATE_DIR_NAME));
+ if (fe)
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.FETCH_DIR_NAME));
+ if (pa)
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.PARSE_DIR_NAME));
+ if (co)
+ FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+ if (pd)
+ FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
+ if (pt)
+ FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
+
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setMapperClass(InputCompatMapper.class);
+ job.setReducerClass(SegmentReader.class);
+
+ Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-"
+ + new java.util.Random().nextInt());
+ fs.delete(tempDir, true);
+
+ FileOutputFormat.setOutputPath(job, tempDir);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(NutchWritable.class);
+
+ JobClient.runJob(job);
+
+ // concatenate the output
+ Path dumpFile = new Path(output, job.get("segment.dump.dir", "dump"));
+
+ // remove the old file
+ fs.delete(dumpFile, true);
+ FileStatus[] fstats = fs.listStatus(tempDir,
+ HadoopFSUtil.getPassAllFilter());
+ Path[] files = HadoopFSUtil.getPaths(fstats);
+
+ PrintWriter writer = null;
+ int currentRecordNumber = 0;
+ if (files.length > 0) {
+ writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
+ fs.create(dumpFile))));
+ try {
+ for (int i = 0; i < files.length; i++) {
+ Path partFile = files[i];
+ try {
+ currentRecordNumber = append(fs, job, partFile, writer,
+ currentRecordNumber);
+ } catch (IOException exception) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Couldn't copy the content of " + partFile.toString()
+ + " into " + dumpFile.toString());
+ LOG.warn(exception.getMessage());
+ }
+ }
+ }
+ } finally {
+ writer.close();
+ }
+ }
+ fs.delete(tempDir, true);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("SegmentReader: done");
+ }
+ }
+
+ /** Appends two files and updates the Recno counter */
+ private int append(FileSystem fs, Configuration conf, Path src,
+ PrintWriter writer, int currentRecordNumber) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ fs.open(src)));
+ try {
+ String line = reader.readLine();
+ while (line != null) {
+ if (line.startsWith("Recno:: ")) {
+ line = "Recno:: " + currentRecordNumber++;
+ }
+ writer.println(line);
+ line = reader.readLine();
+ }
+ return currentRecordNumber;
+ } finally {
+ reader.close();
+ }
+ }
+
+ private static final String[][] keys = new String[][] {
+ { "co", "Content::\n" }, { "ge", "Crawl Generate::\n" },
+ { "fe", "Crawl Fetch::\n" }, { "pa", "Crawl Parse::\n" },
+ { "pd", "ParseData::\n" }, { "pt", "ParseText::\n" } };
+
+ public void get(final Path segment, final Text key, Writer writer,
+ final Map<String, List<Writable>> results) throws Exception {
+ LOG.info("SegmentReader: get '" + key + "'");
+ ArrayList<Thread> threads = new ArrayList<Thread>();
+ if (co)
+ threads.add(new Thread() {
+ public void run() {
+ try {
+ List<Writable> res = getMapRecords(new Path(segment,
+ Content.DIR_NAME), key);
+ results.put("co", res);
+ } catch (Exception e) {
+ LOG.error("Exception:", e);
+ }
+ }
+ });
+ if (fe)
+ threads.add(new Thread() {
+ public void run() {
+ try {
+ List<Writable> res = getMapRecords(new Path(segment,
+ CrawlDatum.FETCH_DIR_NAME), key);
+ results.put("fe", res);
+ } catch (Exception e) {
+ LOG.error("Exception:", e);
+ }
+ }
+ });
+ if (ge)
+ threads.add(new Thread() {
+ public void run() {
+ try {
+ List<Writable> res = getSeqRecords(new Path(segment,
+ CrawlDatum.GENERATE_DIR_NAME), key);
+ results.put("ge", res);
+ } catch (Exception e) {
+ LOG.error("Exception:", e);
+ }
+ }
+ });
+ if (pa)
+ threads.add(new Thread() {
+ public void run() {
+ try {
+ List<Writable> res = getSeqRecords(new Path(segment,
+ CrawlDatum.PARSE_DIR_NAME), key);
+ results.put("pa", res);
+ } catch (Exception e) {
+ LOG.error("Exception:", e);
+ }
+ }
+ });
+ if (pd)
+ threads.add(new Thread() {
+ public void run() {
+ try {
+ List<Writable> res = getMapRecords(new Path(segment,
+ ParseData.DIR_NAME), key);
+ results.put("pd", res);
+ } catch (Exception e) {
+ LOG.error("Exception:", e);
+ }
+ }
+ });
+ if (pt)
+ threads.add(new Thread() {
+ public void run() {
+ try {
+ List<Writable> res = getMapRecords(new Path(segment,
+ ParseText.DIR_NAME), key);
+ results.put("pt", res);
+ } catch (Exception e) {
+ LOG.error("Exception:", e);
+ }
+ }
+ });
+ Iterator<Thread> it = threads.iterator();
+ while (it.hasNext())
+ it.next().start();
+ int cnt;
+ do {
+ cnt = 0;
+ try {
+ Thread.sleep(5000);
+ } catch (Exception e) {
+ }
+ ;
+ it = threads.iterator();
+ while (it.hasNext()) {
+ if (it.next().isAlive())
+ cnt++;
+ }
+ if ((cnt > 0) && (LOG.isDebugEnabled())) {
+ LOG.debug("(" + cnt + " to retrieve)");
+ }
+ } while (cnt > 0);
+ for (int i = 0; i < keys.length; i++) {
+ List<Writable> res = results.get(keys[i][0]);
+ if (res != null && res.size() > 0) {
+ for (int k = 0; k < res.size(); k++) {
+ writer.write(keys[i][1]);
+ writer.write(res.get(k) + "\n");
+ }
+ }
+ writer.flush();
+ }
+ }
+
+ private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
+ MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir,
+ getConf());
+ ArrayList<Writable> res = new ArrayList<Writable>();
+ Class<?> keyClass = readers[0].getKeyClass();
+ Class<?> valueClass = readers[0].getValueClass();
+ if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
+ throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+ Writable value = (Writable) valueClass.newInstance();
+ // we don't know the partitioning schema
+ for (int i = 0; i < readers.length; i++) {
+ if (readers[i].get(key, value) != null) {
+ res.add(value);
+ value = (Writable) valueClass.newInstance();
+ Text aKey = (Text) keyClass.newInstance();
+ while (readers[i].next(aKey, value) && aKey.equals(key)) {
+ res.add(value);
+ value = (Writable) valueClass.newInstance();
+ }
+ }
+ readers[i].close();
+ }
+ return res;
+ }
+
+ private List<Writable> getSeqRecords(Path dir, Text key) throws Exception {
+ SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(
+ getConf(), dir);
+ ArrayList<Writable> res = new ArrayList<Writable>();
+ Class<?> keyClass = readers[0].getKeyClass();
+ Class<?> valueClass = readers[0].getValueClass();
+ if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
+ throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+ Writable aKey = (Writable) keyClass.newInstance();
+ Writable value = (Writable) valueClass.newInstance();
+ for (int i = 0; i < readers.length; i++) {
+ while (readers[i].next(aKey, value)) {
+ if (aKey.equals(key)) {
+ res.add(value);
+ value = (Writable) valueClass.newInstance();
+ }
+ }
+ readers[i].close();
+ }
+ return res;
+ }
+
+ public static class SegmentReaderStats {
+ public long start = -1L;
+ public long end = -1L;
+ public long generated = -1L;
+ public long fetched = -1L;
+ public long fetchErrors = -1L;
+ public long parsed = -1L;
+ public long parseErrors = -1L;
+ }
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+
+ public void list(List<Path> dirs, Writer writer) throws Exception {
+ writer
+ .write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n");
+ for (int i = 0; i < dirs.size(); i++) {
+ Path dir = dirs.get(i);
+ SegmentReaderStats stats = new SegmentReaderStats();
+ getStats(dir, stats);
+ writer.write(dir.getName() + "\t");
+ if (stats.generated == -1)
+ writer.write("?");
+ else
+ writer.write(stats.generated + "");
+ writer.write("\t\t");
+ if (stats.start == -1)
+ writer.write("?\t");
+ else
+ writer.write(sdf.format(new Date(stats.start)));
+ writer.write("\t");
+ if (stats.end == -1)
+ writer.write("?");
+ else
+ writer.write(sdf.format(new Date(stats.end)));
+ writer.write("\t");
+ if (stats.fetched == -1)
+ writer.write("?");
+ else
+ writer.write(stats.fetched + "");
+ writer.write("\t");
+ if (stats.parsed == -1)
+ writer.write("?");
+ else
+ writer.write(stats.parsed + "");
+ writer.write("\n");
+ writer.flush();
+ }
+ }
+
+ public void getStats(Path segment, final SegmentReaderStats stats)
+ throws Exception {
+ long cnt = 0L;
+ Text key = new Text();
+
+ if (ge) {
+ SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(
+ getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+ for (int i = 0; i < readers.length; i++) {
+ while (readers[i].next(key))
+ cnt++;
+ readers[i].close();
+ }
+ stats.generated = cnt;
+ }
+
+ if (fe) {
+ Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
+ if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDirectory()) {
+ cnt = 0L;
+ long start = Long.MAX_VALUE;
+ long end = Long.MIN_VALUE;
+ CrawlDatum value = new CrawlDatum();
+ MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir,
+ getConf());
+ for (int i = 0; i < mreaders.length; i++) {
+ while (mreaders[i].next(key, value)) {
+ cnt++;
+ if (value.getFetchTime() < start)
+ start = value.getFetchTime();
+ if (value.getFetchTime() > end)
+ end = value.getFetchTime();
+ }
+ mreaders[i].close();
+ }
+ stats.start = start;
+ stats.end = end;
+ stats.fetched = cnt;
+ }
+ }
+
+ if (pd) {
+ Path parseDir = new Path(segment, ParseData.DIR_NAME);
+ if (fs.exists(parseDir) && fs.getFileStatus(parseDir).isDirectory()) {
+ cnt = 0L;
+ long errors = 0L;
+ ParseData value = new ParseData();
+ MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir,
+ getConf());
+ for (int i = 0; i < mreaders.length; i++) {
+ while (mreaders[i].next(key, value)) {
+ cnt++;
+ if (!value.getStatus().isSuccess())
+ errors++;
+ }
+ mreaders[i].close();
+ }
+ stats.parsed = cnt;
+ stats.parseErrors = errors;
+ }
+ }
+ }
+
+ private static final int MODE_DUMP = 0;
+
+ private static final int MODE_LIST = 1;
+
+ private static final int MODE_GET = 2;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage();
+ return;
+ }
+ int mode = -1;
+ if (args[0].equals("-dump"))
+ mode = MODE_DUMP;
+ else if (args[0].equals("-list"))
+ mode = MODE_LIST;
+ else if (args[0].equals("-get"))
+ mode = MODE_GET;
+
+ boolean co = true;
+ boolean fe = true;
+ boolean ge = true;
+ boolean pa = true;
+ boolean pd = true;
+ boolean pt = true;
+ // collect general options
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-nocontent")) {
+ co = false;
+ args[i] = null;
+ } else if (args[i].equals("-nofetch")) {
+ fe = false;
+ args[i] = null;
+ } else if (args[i].equals("-nogenerate")) {
+ ge = false;
+ args[i] = null;
+ } else if (args[i].equals("-noparse")) {
+ pa = false;
+ args[i] = null;
+ } else if (args[i].equals("-noparsedata")) {
+ pd = false;
+ args[i] = null;
+ } else if (args[i].equals("-noparsetext")) {
+ pt = false;
+ args[i] = null;
+ }
+ }
+ Configuration conf = NutchConfiguration.create();
+ final FileSystem fs = FileSystem.get(conf);
+ SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd,
+ pt);
+ // collect required args
+ switch (mode) {
+ case MODE_DUMP:
+ String input = args[1];
+ if (input == null) {
+ System.err.println("Missing required argument: <segment_dir>");
+ usage();
+ return;
+ }
+ String output = args.length > 2 ? args[2] : null;
+ if (output == null) {
+ System.err.println("Missing required argument: <output>");
+ usage();
+ return;
+ }
+ segmentReader.dump(new Path(input), new Path(output));
+ return;
+ case MODE_LIST:
+ ArrayList<Path> dirs = new ArrayList<Path>();
+ for (int i = 1; i < args.length; i++) {
+ if (args[i] == null)
+ continue;
+ if (args[i].equals("-dir")) {
+ Path dir = new Path(args[++i]);
+ FileStatus[] fstats = fs.listStatus(dir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] files = HadoopFSUtil.getPaths(fstats);
+ if (files != null && files.length > 0) {
+ dirs.addAll(Arrays.asList(files));
+ }
+ } else
+ dirs.add(new Path(args[i]));
+ }
+ segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+ return;
+ case MODE_GET:
+ input = args[1];
+ if (input == null) {
+ System.err.println("Missing required argument: <segment_dir>");
+ usage();
+ return;
+ }
+ String key = args.length > 2 ? args[2] : null;
+ if (key == null) {
+ System.err.println("Missing required argument: <keyValue>");
+ usage();
+ return;
+ }
+ segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(
+ System.out, "UTF-8"), new HashMap<String, List<Writable>>());
+ return;
+ default:
+ System.err.println("Invalid operation: " + args[0]);
+ usage();
+ return;
+ }
+ }
+
+ private static void usage() {
+ System.err
+ .println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n");
+ System.err.println("* General options:");
+ System.err.println("\t-nocontent\tignore content directory");
+ System.err.println("\t-nofetch\tignore crawl_fetch directory");
+ System.err.println("\t-nogenerate\tignore crawl_generate directory");
+ System.err.println("\t-noparse\tignore crawl_parse directory");
+ System.err.println("\t-noparsedata\tignore parse_data directory");
+ System.err.println("\t-noparsetext\tignore parse_text directory");
+ System.err.println();
+ System.err
+ .println("* SegmentReader -dump <segment_dir> <output> [general options]");
+ System.err
+ .println(" Dumps content of a <segment_dir> as a text file to <output>.\n");
+ System.err.println("\t<segment_dir>\tname of the segment directory.");
+ System.err
+ .println("\t<output>\tname of the (non-existent) output directory.");
+ System.err.println();
+ System.err
+ .println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]");
+ System.err
+ .println(" List a synopsis of segments in specified directories, or all segments in");
+ System.err
+ .println(" a directory <segments>, and print it on System.out\n");
+ System.err
+ .println("\t<segment_dir1> ...\tlist of segment directories to process");
+ System.err
+ .println("\t-dir <segments>\t\tdirectory that contains multiple segments");
+ System.err.println();
+ System.err
+ .println("* SegmentReader -get <segment_dir> <keyValue> [general options]");
+ System.err
+ .println(" Get a specified record from a segment, and print it on System.out.\n");
+ System.err.println("\t<segment_dir>\tname of the segment directory.");
+ System.err.println("\t<keyValue>\tvalue of the key (url).");
+ System.err
+ .println("\t\tNote: put double-quotes around strings with spaces.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java b/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java
new file mode 100644
index 0000000..ecc0c26
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A segment stores all data from on generate/fetch/update cycle:
+ * fetch list, protocol status, raw content, parsed content, and extracted outgoing links.
+ */
+package org.apache.nutch.segment;
+
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java b/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java
new file mode 100644
index 0000000..c71cfa9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.model.request.NutchConfig;
+
+public interface ConfManager {
+
+ public Configuration get(String confId);
+
+ public Map<String, String> getAsMap(String confId);
+
+ public void setProperty(String confId, String propName, String propValue);
+
+ public Set<String> list();
+
+ public String create(NutchConfig nutchConfig);
+
+ public void delete(String confId);
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java b/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java
new file mode 100644
index 0000000..20346fc
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service;
+
+import java.util.Collection;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+
+public interface JobManager {
+
+ public static enum JobType{
+ INJECT, GENERATE, FETCH, PARSE, UPDATEDB, INDEX, READDB, CLASS, INVERTLINKS, DEDUP
+ };
+ public Collection<JobInfo> list(String crawlId, State state);
+
+ public JobInfo get(String crawlId, String id);
+
+ /**
+ * Creates specified job
+ * @param jobConfig
+ * @return JobInfo
+ */
+ public JobInfo create(JobConfig jobConfig);
+
+ public boolean abort(String crawlId, String id);
+
+ public boolean stop(String crawlId, String id);
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java b/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java
new file mode 100644
index 0000000..00bb78f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service;
+
+import java.io.FileNotFoundException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.impl.SequenceReader;
+import org.apache.nutch.util.NutchConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface NutchReader {
+
+ public static final Logger LOG = LoggerFactory.getLogger(NutchReader.class);
+ public static final Configuration conf = NutchConfiguration.create();
+
+ public List read(String path) throws FileNotFoundException;
+ public List head(String path, int nrows) throws FileNotFoundException;
+ public List slice(String path, int start, int end) throws FileNotFoundException;
+ public int count(String path) throws FileNotFoundException;
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java b/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java
new file mode 100644
index 0000000..e206707
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
+
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.cxf.binding.BindingFactoryManager;
+import org.apache.cxf.jaxrs.JAXRSBindingFactory;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.nutch.fetcher.FetchNodeDb;
+import org.apache.nutch.service.impl.ConfManagerImpl;
+import org.apache.nutch.service.impl.JobFactory;
+import org.apache.nutch.service.impl.JobManagerImpl;
+import org.apache.nutch.service.impl.NutchServerPoolExecutor;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.service.resources.AdminResource;
+import org.apache.nutch.service.resources.ConfigResource;
+import org.apache.nutch.service.resources.DbResource;
+import org.apache.nutch.service.resources.JobResource;
+import org.apache.nutch.service.resources.ReaderResouce;
+import org.apache.nutch.service.resources.SeedResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Queues;
+
+public class NutchServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NutchServer.class);
+
+ private static final String LOCALHOST = "localhost";
+ private static final Integer DEFAULT_PORT = 8081;
+ private static final int JOB_CAPACITY = 100;
+
+ private static Integer port = DEFAULT_PORT;
+ private static String host = LOCALHOST;
+
+ private static final String CMD_HELP = "help";
+ private static final String CMD_PORT = "port";
+ private static final String CMD_HOST = "host";
+
+ private long started;
+ private boolean running;
+ private ConfManager configManager;
+ private JobManager jobManager;
+ private JAXRSServerFactoryBean sf;
+
+ private static FetchNodeDb fetchNodeDb;
+
+ private static NutchServer server;
+
+ static {
+ server = new NutchServer();
+ }
+
+ private NutchServer() {
+ configManager = new ConfManagerImpl();
+ BlockingQueue<Runnable> runnables = Queues.newArrayBlockingQueue(JOB_CAPACITY);
+ NutchServerPoolExecutor executor = new NutchServerPoolExecutor(10, JOB_CAPACITY, 1, TimeUnit.HOURS, runnables);
+ jobManager = new JobManagerImpl(new JobFactory(), configManager, executor);
+ fetchNodeDb = FetchNodeDb.getInstance();
+
+ sf = new JAXRSServerFactoryBean();
+ BindingFactoryManager manager = sf.getBus().getExtension(BindingFactoryManager.class);
+ JAXRSBindingFactory factory = new JAXRSBindingFactory();
+ factory.setBus(sf.getBus());
+ manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory);
+ sf.setResourceClasses(getClasses());
+ sf.setResourceProviders(getResourceProviders());
+ sf.setProvider(new JacksonJaxbJsonProvider());
+
+ }
+
+ public static NutchServer getInstance() {
+ return server;
+ }
+
+ protected static void startServer() {
+ server.start();
+ }
+
+ private void start() {
+ LOG.info("Starting NutchServer on {}:{} ...", host, port);
+ try{
+ String address = "http://" + host + ":" + port;
+ sf.setAddress(address);
+ sf.create();
+ }catch(Exception e){
+ throw new IllegalStateException("Server could not be started", e);
+ }
+
+ started = System.currentTimeMillis();
+ running = true;
+ LOG.info("Started Nutch Server on {}:{} at {}", new Object[] {host, port, started});
+ }
+
+ private List<Class<?>> getClasses() {
+ List<Class<?>> resources = new ArrayList<Class<?>>();
+ resources.add(JobResource.class);
+ resources.add(ConfigResource.class);
+ resources.add(DbResource.class);
+ resources.add(AdminResource.class);
+ resources.add(SeedResource.class);
+ resources.add(ReaderResouce.class);
+ return resources;
+ }
+
+ private List<ResourceProvider> getResourceProviders() {
+ List<ResourceProvider> resourceProviders = new ArrayList<ResourceProvider>();
+ resourceProviders.add(new SingletonResourceProvider(getConfManager()));
+ return resourceProviders;
+ }
+
+ public ConfManager getConfManager() {
+ return configManager;
+ }
+
+ public JobManager getJobManager() {
+ return jobManager;
+ }
+
+ public FetchNodeDb getFetchNodeDb(){
+ return fetchNodeDb;
+ }
+
+ public boolean isRunning(){
+ return running;
+ }
+
+ public long getStarted(){
+ return started;
+ }
+
+ public static void main(String[] args) throws ParseException {
+ CommandLineParser parser = new PosixParser();
+ Options options = createOptions();
+ CommandLine commandLine = parser.parse(options, args);
+ if (commandLine.hasOption(CMD_HELP)) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("NutchServer", options, true);
+ return;
+ }
+
+ if (commandLine.hasOption(CMD_PORT)) {
+ port = Integer.parseInt(commandLine.getOptionValue(CMD_PORT));
+ }
+
+ if (commandLine.hasOption(CMD_HOST)) {
+ host = commandLine.getOptionValue(CMD_HOST);
+ }
+
+ startServer();
+ }
+
+ private static Options createOptions() {
+ Options options = new Options();
+
+ OptionBuilder.withDescription("Show this help");
+ options.addOption(OptionBuilder.create(CMD_HELP));
+
+ OptionBuilder.withArgName("port");
+ OptionBuilder.hasOptionalArg();
+ OptionBuilder.withDescription("The port to run the Nutch Server. Default port 8081");
+ options.addOption(OptionBuilder.create(CMD_PORT));
+
+ OptionBuilder.withArgName("host");
+ OptionBuilder.hasOptionalArg();
+ OptionBuilder.withDescription("The host to bind the Nutch Server to. Default is localhost.");
+ options.addOption(OptionBuilder.create(CMD_HOST));
+
+ return options;
+ }
+
+ public boolean canStop(boolean force){
+ if(force)
+ return true;
+
+ Collection<JobInfo> jobs = getJobManager().list(null, State.RUNNING);
+ return jobs.isEmpty();
+ }
+
+ protected static void setPort(int port) {
+ NutchServer.port = port;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void stop() {
+ System.exit(0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java
new file mode 100644
index 0000000..0c08ce4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.ConfManager;
+import org.apache.nutch.service.model.request.NutchConfig;
+import org.apache.nutch.service.resources.ConfigResource;
+import org.apache.nutch.util.NutchConfiguration;
+
+import com.google.common.collect.Maps;
+
+public class ConfManagerImpl implements ConfManager {
+
+
+ private Map<String, Configuration> configurations = Maps.newConcurrentMap();
+
+ private AtomicInteger newConfigId = new AtomicInteger();
+
+ public ConfManagerImpl() {
+ configurations.put(ConfigResource.DEFAULT, NutchConfiguration.create());
+ }
+
+ /**
+ * Returns the configuration associatedConfManagerImpl with the given confId
+ */
+ public Configuration get(String confId) {
+ if (confId == null) {
+ return configurations.get(ConfigResource.DEFAULT);
+ }
+ return configurations.get(confId);
+ }
+
+ public Map<String, String> getAsMap(String confId) {
+ Configuration configuration = configurations.get(confId);
+ if (configuration == null) {
+ return Collections.emptyMap();
+ }
+
+ Iterator<Entry<String, String>> iterator = configuration.iterator();
+ Map<String, String> configMap = Maps.newTreeMap();
+ while (iterator.hasNext()) {
+ Entry<String, String> entry = iterator.next();
+ configMap.put(entry.getKey(), entry.getValue());
+ }
+ return configMap;
+ }
+
+ /**
+ * Sets the given property in the configuration associated with the confId
+ */
+ public void setProperty(String confId, String propName, String propValue) {
+ if (!configurations.containsKey(confId)) {
+ throw new IllegalArgumentException("Unknown configId '" + confId + "'");
+ }
+ Configuration conf = configurations.get(confId);
+ conf.set(propName, propValue);
+ }
+
+ public Set<String> list() {
+ return configurations.keySet();
+ }
+
+ /**
+ * Created a new configuration based on the values provided.
+ * @param NutchConfig
+ * @return String - confId
+ */
+ public String create(NutchConfig nutchConfig) {
+ if (StringUtils.isBlank(nutchConfig.getConfigId())) {
+ nutchConfig.setConfigId(String.valueOf(newConfigId.incrementAndGet()));
+ }
+
+ if (!canCreate(nutchConfig)) {
+ throw new IllegalArgumentException("Config already exists.");
+ }
+
+ createHadoopConfig(nutchConfig);
+ return nutchConfig.getConfigId();
+ }
+
+
+ public void delete(String confId) {
+ configurations.remove(confId);
+ }
+
+ private boolean canCreate(NutchConfig nutchConfig) {
+ if (nutchConfig.isForce()) {
+ return true;
+ }
+ if (!configurations.containsKey(nutchConfig.getConfigId())) {
+ return true;
+ }
+ return false;
+ }
+
+ private void createHadoopConfig(NutchConfig nutchConfig) {
+ Configuration conf = NutchConfiguration.create();
+ configurations.put(nutchConfig.getConfigId(), conf);
+
+ if (MapUtils.isEmpty(nutchConfig.getParams())) {
+ return;
+ }
+ for (Entry<String, String> e : nutchConfig.getParams().entrySet()) {
+ conf.set(e.getKey(), e.getValue());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java
new file mode 100644
index 0000000..a74e362
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.service.JobManager.JobType;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.DeduplicationJob;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.indexer.IndexingJob;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.NutchTool;
+
+import com.google.common.collect.Maps;
+
+public class JobFactory {
+ private static Map<JobType, Class<? extends NutchTool>> typeToClass;
+
+ static {
+ typeToClass = Maps.newHashMap();
+ typeToClass.put(JobType.INJECT, Injector.class);
+ typeToClass.put(JobType.GENERATE, Generator.class);
+ typeToClass.put(JobType.FETCH, Fetcher.class);
+ typeToClass.put(JobType.PARSE, ParseSegment.class);
+ typeToClass.put(JobType.INDEX, IndexingJob.class);
+ typeToClass.put(JobType.UPDATEDB, CrawlDb.class);
+ typeToClass.put(JobType.INVERTLINKS, LinkDb.class);
+ typeToClass.put(JobType.DEDUP, DeduplicationJob.class);
+ }
+
+ public NutchTool createToolByType(JobType type, Configuration conf) {
+ if (!typeToClass.containsKey(type)) {
+ return null;
+ }
+ Class<? extends NutchTool> clz = typeToClass.get(type);
+ return createTool(clz, conf);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public NutchTool createToolByClassName(String className, Configuration conf) {
+ try {
+ Class clz = Class.forName(className);
+ return createTool(clz, conf);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private NutchTool createTool(Class<? extends NutchTool> clz,
+ Configuration conf) {
+ return ReflectionUtils.newInstance(clz, conf);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java
new file mode 100644
index 0000000..a915457
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collection;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.ConfManager;
+import org.apache.nutch.service.JobManager;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.util.NutchTool;
+
+public class JobManagerImpl implements JobManager {
+
+ private JobFactory jobFactory;
+ private NutchServerPoolExecutor executor;
+ private ConfManager configManager;
+
+ public JobManagerImpl(JobFactory jobFactory, ConfManager configManager, NutchServerPoolExecutor executor) {
+ this.jobFactory = jobFactory;
+ this.configManager = configManager;
+ this.executor = executor;
+ }
+
+ @Override
+ public JobInfo create(JobConfig jobConfig) {
+ if (jobConfig.getArgs() == null) {
+ throw new IllegalArgumentException("Arguments cannot be null!");
+ }
+ Configuration conf = cloneConfiguration(jobConfig.getConfId());
+ NutchTool tool = createTool(jobConfig, conf);
+ JobWorker worker = new JobWorker(jobConfig, conf, tool);
+ executor.execute(worker);
+ executor.purge();
+ return worker.getInfo();
+ }
+
+ private Configuration cloneConfiguration(String confId) {
+ Configuration conf = configManager.get(confId);
+ if (conf == null) {
+ throw new IllegalArgumentException("Unknown confId " + confId);
+ }
+ return new Configuration(conf);
+ }
+
+ @Override
+ public Collection<JobInfo> list(String crawlId, State state) {
+ if (state == null || state == State.ANY) {
+ return executor.getAllJobs();
+ }
+ if (state == State.RUNNING || state == State.IDLE) {
+ return executor.getJobRunning();
+ }
+ return executor.getJobHistory();
+ }
+
+ @Override
+ public JobInfo get(String crawlId, String jobId) {
+ return executor.getInfo(jobId);
+ }
+
+ @Override
+ public boolean abort(String crawlId, String id) {
+ return executor.findWorker(id).killJob();
+ }
+
+ @Override
+ public boolean stop(String crawlId, String id) {
+ return executor.findWorker(id).stopJob();
+ }
+
+ private NutchTool createTool(JobConfig jobConfig, Configuration conf){
+ if(StringUtils.isNotBlank(jobConfig.getJobClassName())){
+ return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf);
+ }
+ return jobFactory.createToolByType(jobConfig.getType(), conf);
+ }
+}