You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by th...@apache.org on 2016/07/16 19:48:54 UTC

[38/51] [partial] nutch git commit: NUTCH-2292 : Mavenize the build for nutch-core and nutch-plugins

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java
new file mode 100644
index 0000000..ef12f52
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentMerger.java
@@ -0,0 +1,793 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TreeMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.MapFile.Writer.Option;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Metadata;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.metadata.MetaWrapper;
+import org.apache.nutch.metadata.Nutch;
+import org.apache.nutch.net.URLFilters;
+import org.apache.nutch.net.URLNormalizers;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/**
+ * This tool takes several segments and merges their data together. Only the
+ * latest versions of data is retained.
+ * <p>
+ * Optionally, you can apply current URLFilters to remove prohibited URL-s.
+ * </p>
+ * <p>
+ * Also, it's possible to slice the resulting segment into chunks of fixed size.
+ * </p>
+ * <h3>Important Notes</h3> <h4>Which parts are merged?</h4>
+ * <p>
+ * It doesn't make sense to merge data from segments, which are at different
+ * stages of processing (e.g. one unfetched segment, one fetched but not parsed,
+ * and one fetched and parsed). Therefore, prior to merging, the tool will
+ * determine the lowest common set of input data, and only this data will be
+ * merged. This may have some unintended consequences: e.g. if majority of input
+ * segments are fetched and parsed, but one of them is unfetched, the tool will
+ * fall back to just merging fetchlists, and it will skip all other data from
+ * all segments.
+ * </p>
+ * <h4>Merging fetchlists</h4>
+ * <p>
+ * Merging segments, which contain just fetchlists (i.e. prior to fetching) is
+ * not recommended, because this tool (unlike the
+ * {@link org.apache.nutch.crawl.Generator} doesn't ensure that fetchlist parts
+ * for each map task are disjoint.
+ * </p>
+ * <p>
+ * <h4>Duplicate content</h4>
+ * Merging segments removes older content whenever possible (see below).
+ * However, this is NOT the same as de-duplication, which in addition removes
+ * identical content found at different URL-s. In other words, running
+ * DeleteDuplicates is still necessary.
+ * </p>
+ * <p>
+ * For some types of data (especially ParseText) it's not possible to determine
+ * which version is really older. Therefore the tool always uses segment names
+ * as timestamps, for all types of input data. Segment names are compared in
+ * forward lexicographic order (0-9a-zA-Z), and data from segments with "higher"
+ * names will prevail. It follows then that it is extremely important that
+ * segments be named in an increasing lexicographic order as their creation time
+ * increases.
+ * </p>
+ * <p>
+ * <h4>Merging and indexes</h4>
+ * Merged segment gets a different name. Since Indexer embeds segment names in
+ * indexes, any indexes originally created for the input segments will NOT work
+ * with the merged segment. Newly created merged segment(s) need to be indexed
+ * afresh. This tool doesn't use existing indexes in any way, so if you plan to
+ * merge segments you don't have to index them prior to merging.
+ * 
+ * 
+ * @author Andrzej Bialecki
+ */
+public class SegmentMerger extends Configured implements Tool,
+    Mapper<Text, MetaWrapper, Text, MetaWrapper>,
+    Reducer<Text, MetaWrapper, Text, MetaWrapper> {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SegmentMerger.class);
+
+  private static final String SEGMENT_PART_KEY = "part";
+  private static final String SEGMENT_SLICE_KEY = "slice";
+
+  private URLFilters filters = null;
+  private URLNormalizers normalizers = null;
+  private SegmentMergeFilters mergeFilters = null;
+  private long sliceSize = -1;
+  private long curCount = 0;
+
+  /**
+   * Wraps inputs in an {@link MetaWrapper}, to permit merging different types
+   * in reduce and use additional metadata.
+   */
+  public static class ObjectInputFormat extends
+      SequenceFileInputFormat<Text, MetaWrapper> {
+
+    @Override
+    public RecordReader<Text, MetaWrapper> getRecordReader(
+        final InputSplit split, final JobConf job, Reporter reporter)
+        throws IOException {
+
+      reporter.setStatus(split.toString());
+
+      // find part name
+      SegmentPart segmentPart;
+      final String spString;
+      final FileSplit fSplit = (FileSplit) split;
+      try {
+        segmentPart = SegmentPart.get(fSplit);
+        spString = segmentPart.toString();
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot identify segment:", e);
+      }
+
+      SequenceFile.Reader reader = new SequenceFile.Reader(job, SequenceFile.Reader.file(fSplit.getPath()));
+
+      final Writable w;
+      try {
+        w = (Writable) reader.getValueClass().newInstance();
+      } catch (Exception e) {
+        throw new IOException(e.toString());
+      } finally {
+        try {
+          reader.close();
+        } catch (Exception e) {
+          // ignore
+        }
+      }
+      final SequenceFileRecordReader<Text, Writable> splitReader = new SequenceFileRecordReader<Text, Writable>(
+          job, (FileSplit) split);
+
+      try {
+        return new SequenceFileRecordReader<Text, MetaWrapper>(job, fSplit) {
+
+          public synchronized boolean next(Text key, MetaWrapper wrapper)
+              throws IOException {
+            LOG.debug("Running OIF.next()");
+
+            boolean res = splitReader.next(key, w);
+            wrapper.set(w);
+            wrapper.setMeta(SEGMENT_PART_KEY, spString);
+            return res;
+          }
+
+          @Override
+          public synchronized void close() throws IOException {
+            splitReader.close();
+          }
+
+          @Override
+          public MetaWrapper createValue() {
+            return new MetaWrapper();
+          }
+
+        };
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot create RecordReader: ", e);
+      }
+    }
+  }
+
+  public static class SegmentOutputFormat extends
+      FileOutputFormat<Text, MetaWrapper> {
+    private static final String DEFAULT_SLICE = "default";
+
+    @Override
+    public RecordWriter<Text, MetaWrapper> getRecordWriter(final FileSystem fs,
+        final JobConf job, final String name, final Progressable progress)
+        throws IOException {
+      return new RecordWriter<Text, MetaWrapper>() {
+        MapFile.Writer c_out = null;
+        MapFile.Writer f_out = null;
+        MapFile.Writer pd_out = null;
+        MapFile.Writer pt_out = null;
+        SequenceFile.Writer g_out = null;
+        SequenceFile.Writer p_out = null;
+        HashMap<String, Closeable> sliceWriters = new HashMap<String, Closeable>();
+        String segmentName = job.get("segment.merger.segmentName");
+
+        public void write(Text key, MetaWrapper wrapper) throws IOException {
+          // unwrap
+          SegmentPart sp = SegmentPart.parse(wrapper.getMeta(SEGMENT_PART_KEY));
+          Writable o = wrapper.get();
+          String slice = wrapper.getMeta(SEGMENT_SLICE_KEY);
+          if (o instanceof CrawlDatum) {
+            if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+              g_out = ensureSequenceFile(slice, CrawlDatum.GENERATE_DIR_NAME);
+              g_out.append(key, o);
+            } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
+              f_out = ensureMapFile(slice, CrawlDatum.FETCH_DIR_NAME,
+                  CrawlDatum.class);
+              f_out.append(key, o);
+            } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
+              p_out = ensureSequenceFile(slice, CrawlDatum.PARSE_DIR_NAME);
+              p_out.append(key, o);
+            } else {
+              throw new IOException("Cannot determine segment part: "
+                  + sp.partName);
+            }
+          } else if (o instanceof Content) {
+            c_out = ensureMapFile(slice, Content.DIR_NAME, Content.class);
+            c_out.append(key, o);
+          } else if (o instanceof ParseData) {
+            // update the segment name inside contentMeta - required by Indexer
+            if (slice == null) {
+              ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
+                  segmentName);
+            } else {
+              ((ParseData) o).getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
+                  segmentName + "-" + slice);
+            }
+            pd_out = ensureMapFile(slice, ParseData.DIR_NAME, ParseData.class);
+            pd_out.append(key, o);
+          } else if (o instanceof ParseText) {
+            pt_out = ensureMapFile(slice, ParseText.DIR_NAME, ParseText.class);
+            pt_out.append(key, o);
+          }
+        }
+
+        // lazily create SequenceFile-s.
+        private SequenceFile.Writer ensureSequenceFile(String slice,
+            String dirName) throws IOException {
+          if (slice == null)
+            slice = DEFAULT_SLICE;
+          SequenceFile.Writer res = (SequenceFile.Writer) sliceWriters
+              .get(slice + dirName);
+          if (res != null)
+            return res;
+          Path wname;
+          Path out = FileOutputFormat.getOutputPath(job);
+          if (slice == DEFAULT_SLICE) {
+            wname = new Path(new Path(new Path(out, segmentName), dirName),
+                name);
+          } else {
+            wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
+                dirName), name);
+          }
+          
+//          Option rKeyClassOpt = MapFile.Writer.keyClass(Text.class);
+//          org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(CrawlDatum.class);
+//          Option rProgressOpt = (Option) SequenceFile.Writer.progressable(progress);
+//          Option rCompOpt = (Option) SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job));
+//          Option rFileOpt = (Option) SequenceFile.Writer.file(wname);
+          
+          //res = SequenceFile.createWriter(job, rFileOpt, rKeyClassOpt,
+           //   rValClassOpt, rCompOpt, rProgressOpt);
+          
+          res = SequenceFile.createWriter(job, SequenceFile.Writer.file(wname),
+              SequenceFile.Writer.keyClass(Text.class),
+              SequenceFile.Writer.valueClass(CrawlDatum.class),
+              SequenceFile.Writer.bufferSize(fs.getConf().getInt("io.file.buffer.size",4096)),
+              SequenceFile.Writer.replication(fs.getDefaultReplication(wname)),
+              SequenceFile.Writer.blockSize(1073741824),
+              SequenceFile.Writer.compression(SequenceFileOutputFormat.getOutputCompressionType(job), new DefaultCodec()),
+              SequenceFile.Writer.progressable(progress),
+              SequenceFile.Writer.metadata(new Metadata())); 
+          
+          sliceWriters.put(slice + dirName, res);
+          return res;
+        }
+
+        // lazily create MapFile-s.
+        private MapFile.Writer ensureMapFile(String slice, String dirName,
+            Class<? extends Writable> clazz) throws IOException {
+          if (slice == null)
+            slice = DEFAULT_SLICE;
+          MapFile.Writer res = (MapFile.Writer) sliceWriters.get(slice
+              + dirName);
+          if (res != null)
+            return res;
+          Path wname;
+          Path out = FileOutputFormat.getOutputPath(job);
+          if (slice == DEFAULT_SLICE) {
+            wname = new Path(new Path(new Path(out, segmentName), dirName),
+                name);
+          } else {
+            wname = new Path(new Path(new Path(out, segmentName + "-" + slice),
+                dirName), name);
+          }
+          CompressionType compType = SequenceFileOutputFormat
+              .getOutputCompressionType(job);
+          if (clazz.isAssignableFrom(ParseText.class)) {
+            compType = CompressionType.RECORD;
+          }
+          
+          Option rKeyClassOpt = (Option) MapFile.Writer.keyClass(Text.class);
+          org.apache.hadoop.io.SequenceFile.Writer.Option rValClassOpt = SequenceFile.Writer.valueClass(clazz);
+          org.apache.hadoop.io.SequenceFile.Writer.Option rProgressOpt = SequenceFile.Writer.progressable(progress);
+          org.apache.hadoop.io.SequenceFile.Writer.Option rCompOpt = SequenceFile.Writer.compression(compType);
+          
+          res = new MapFile.Writer(job, wname, rKeyClassOpt,
+              rValClassOpt, rCompOpt, rProgressOpt);
+          sliceWriters.put(slice + dirName, res);
+          return res;
+        }
+
+        public void close(Reporter reporter) throws IOException {
+          Iterator<Closeable> it = sliceWriters.values().iterator();
+          while (it.hasNext()) {
+            Object o = it.next();
+            if (o instanceof SequenceFile.Writer) {
+              ((SequenceFile.Writer) o).close();
+            } else {
+              ((MapFile.Writer) o).close();
+            }
+          }
+        }
+      };
+    }
+  }
+
+  public SegmentMerger() {
+    super(null);
+  }
+
+  public SegmentMerger(Configuration conf) {
+    super(conf);
+  }
+
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    if (conf == null)
+      return;
+    if (conf.getBoolean("segment.merger.filter", false)) {
+      filters = new URLFilters(conf);
+      mergeFilters = new SegmentMergeFilters(conf);
+    }
+    if (conf.getBoolean("segment.merger.normalizer", false))
+      normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_DEFAULT);
+    sliceSize = conf.getLong("segment.merger.slice", -1);
+    if ((sliceSize > 0) && (LOG.isInfoEnabled())) {
+      LOG.info("Slice size: " + sliceSize + " URLs.");
+    }
+  }
+
+  public void close() throws IOException {
+  }
+
+  public void configure(JobConf conf) {
+    setConf(conf);
+    if (sliceSize > 0) {
+      sliceSize = sliceSize / conf.getNumReduceTasks();
+    }
+  }
+
+  private Text newKey = new Text();
+
+  public void map(Text key, MetaWrapper value,
+      OutputCollector<Text, MetaWrapper> output, Reporter reporter)
+      throws IOException {
+    String url = key.toString();
+    if (normalizers != null) {
+      try {
+        url = normalizers.normalize(url, URLNormalizers.SCOPE_DEFAULT); // normalize
+                                                                        // the
+                                                                        // url
+      } catch (Exception e) {
+        LOG.warn("Skipping " + url + ":" + e.getMessage());
+        url = null;
+      }
+    }
+    if (url != null && filters != null) {
+      try {
+        url = filters.filter(url);
+      } catch (Exception e) {
+        LOG.warn("Skipping key " + url + ": " + e.getMessage());
+        url = null;
+      }
+    }
+    if (url != null) {
+      newKey.set(url);
+      output.collect(newKey, value);
+    }
+  }
+
+  /**
+   * NOTE: in selecting the latest version we rely exclusively on the segment
+   * name (not all segment data contain time information). Therefore it is
+   * extremely important that segments be named in an increasing lexicographic
+   * order as their creation time increases.
+   */
+  public void reduce(Text key, Iterator<MetaWrapper> values,
+      OutputCollector<Text, MetaWrapper> output, Reporter reporter)
+      throws IOException {
+    CrawlDatum lastG = null;
+    CrawlDatum lastF = null;
+    CrawlDatum lastSig = null;
+    Content lastC = null;
+    ParseData lastPD = null;
+    ParseText lastPT = null;
+    String lastGname = null;
+    String lastFname = null;
+    String lastSigname = null;
+    String lastCname = null;
+    String lastPDname = null;
+    String lastPTname = null;
+    TreeMap<String, ArrayList<CrawlDatum>> linked = new TreeMap<String, ArrayList<CrawlDatum>>();
+    while (values.hasNext()) {
+      MetaWrapper wrapper = values.next();
+      Object o = wrapper.get();
+      String spString = wrapper.getMeta(SEGMENT_PART_KEY);
+      if (spString == null) {
+        throw new IOException("Null segment part, key=" + key);
+      }
+      SegmentPart sp = SegmentPart.parse(spString);
+      if (o instanceof CrawlDatum) {
+        CrawlDatum val = (CrawlDatum) o;
+        // check which output dir it belongs to
+        if (sp.partName.equals(CrawlDatum.GENERATE_DIR_NAME)) {
+          if (lastG == null) {
+            lastG = val;
+            lastGname = sp.segmentName;
+          } else {
+            // take newer
+            if (lastGname.compareTo(sp.segmentName) < 0) {
+              lastG = val;
+              lastGname = sp.segmentName;
+            }
+          }
+        } else if (sp.partName.equals(CrawlDatum.FETCH_DIR_NAME)) {
+          // only consider fetch status and ignore fetch retry status
+          // https://issues.apache.org/jira/browse/NUTCH-1520
+          // https://issues.apache.org/jira/browse/NUTCH-1113
+          if (CrawlDatum.hasFetchStatus(val)
+              && val.getStatus() != CrawlDatum.STATUS_FETCH_RETRY
+              && val.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
+            if (lastF == null) {
+              lastF = val;
+              lastFname = sp.segmentName;
+            } else {
+              if (lastFname.compareTo(sp.segmentName) < 0) {
+                lastF = val;
+                lastFname = sp.segmentName;
+              }
+            }
+          }
+        } else if (sp.partName.equals(CrawlDatum.PARSE_DIR_NAME)) {
+          if (val.getStatus() == CrawlDatum.STATUS_SIGNATURE) {
+            if (lastSig == null) {
+              lastSig = val;
+              lastSigname = sp.segmentName;
+            } else {
+              // take newer
+              if (lastSigname.compareTo(sp.segmentName) < 0) {
+                lastSig = val;
+                lastSigname = sp.segmentName;
+              }
+            }
+            continue;
+          }
+          // collect all LINKED values from the latest segment
+          ArrayList<CrawlDatum> segLinked = linked.get(sp.segmentName);
+          if (segLinked == null) {
+            segLinked = new ArrayList<CrawlDatum>();
+            linked.put(sp.segmentName, segLinked);
+          }
+          segLinked.add(val);
+        } else {
+          throw new IOException("Cannot determine segment part: " + sp.partName);
+        }
+      } else if (o instanceof Content) {
+        if (lastC == null) {
+          lastC = (Content) o;
+          lastCname = sp.segmentName;
+        } else {
+          if (lastCname.compareTo(sp.segmentName) < 0) {
+            lastC = (Content) o;
+            lastCname = sp.segmentName;
+          }
+        }
+      } else if (o instanceof ParseData) {
+        if (lastPD == null) {
+          lastPD = (ParseData) o;
+          lastPDname = sp.segmentName;
+        } else {
+          if (lastPDname.compareTo(sp.segmentName) < 0) {
+            lastPD = (ParseData) o;
+            lastPDname = sp.segmentName;
+          }
+        }
+      } else if (o instanceof ParseText) {
+        if (lastPT == null) {
+          lastPT = (ParseText) o;
+          lastPTname = sp.segmentName;
+        } else {
+          if (lastPTname.compareTo(sp.segmentName) < 0) {
+            lastPT = (ParseText) o;
+            lastPTname = sp.segmentName;
+          }
+        }
+      }
+    }
+    // perform filtering based on full merge record
+    if (mergeFilters != null
+        && !mergeFilters.filter(key, lastG, lastF, lastSig, lastC, lastPD,
+            lastPT, linked.isEmpty() ? null : linked.lastEntry().getValue())) {
+      return;
+    }
+
+    curCount++;
+    String sliceName = null;
+    MetaWrapper wrapper = new MetaWrapper();
+    if (sliceSize > 0) {
+      sliceName = String.valueOf(curCount / sliceSize);
+      wrapper.setMeta(SEGMENT_SLICE_KEY, sliceName);
+    }
+    SegmentPart sp = new SegmentPart();
+    // now output the latest values
+    if (lastG != null) {
+      wrapper.set(lastG);
+      sp.partName = CrawlDatum.GENERATE_DIR_NAME;
+      sp.segmentName = lastGname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      output.collect(key, wrapper);
+    }
+    if (lastF != null) {
+      wrapper.set(lastF);
+      sp.partName = CrawlDatum.FETCH_DIR_NAME;
+      sp.segmentName = lastFname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      output.collect(key, wrapper);
+    }
+    if (lastSig != null) {
+      wrapper.set(lastSig);
+      sp.partName = CrawlDatum.PARSE_DIR_NAME;
+      sp.segmentName = lastSigname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      output.collect(key, wrapper);
+    }
+    if (lastC != null) {
+      wrapper.set(lastC);
+      sp.partName = Content.DIR_NAME;
+      sp.segmentName = lastCname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      output.collect(key, wrapper);
+    }
+    if (lastPD != null) {
+      wrapper.set(lastPD);
+      sp.partName = ParseData.DIR_NAME;
+      sp.segmentName = lastPDname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      output.collect(key, wrapper);
+    }
+    if (lastPT != null) {
+      wrapper.set(lastPT);
+      sp.partName = ParseText.DIR_NAME;
+      sp.segmentName = lastPTname;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      output.collect(key, wrapper);
+    }
+    if (linked.size() > 0) {
+      String name = linked.lastKey();
+      sp.partName = CrawlDatum.PARSE_DIR_NAME;
+      sp.segmentName = name;
+      wrapper.setMeta(SEGMENT_PART_KEY, sp.toString());
+      ArrayList<CrawlDatum> segLinked = linked.get(name);
+      for (int i = 0; i < segLinked.size(); i++) {
+        CrawlDatum link = segLinked.get(i);
+        wrapper.set(link);
+        output.collect(key, wrapper);
+      }
+    }
+  }
+
+  public void merge(Path out, Path[] segs, boolean filter, boolean normalize,
+      long slice) throws Exception {
+    String segmentName = Generator.generateSegmentName();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Merging " + segs.length + " segments to " + out + "/"
+          + segmentName);
+    }
+    JobConf job = new NutchJob(getConf());
+    job.setJobName("mergesegs " + out + "/" + segmentName);
+    job.setBoolean("segment.merger.filter", filter);
+    job.setBoolean("segment.merger.normalizer", normalize);
+    job.setLong("segment.merger.slice", slice);
+    job.set("segment.merger.segmentName", segmentName);
+    FileSystem fs = FileSystem.get(getConf());
+    // prepare the minimal common set of input dirs
+    boolean g = true;
+    boolean f = true;
+    boolean p = true;
+    boolean c = true;
+    boolean pd = true;
+    boolean pt = true;
+    
+    // These contain previous values, we use it to track changes in the loop
+    boolean pg = true;
+    boolean pf = true;
+    boolean pp = true;
+    boolean pc = true;
+    boolean ppd = true;
+    boolean ppt = true;
+    for (int i = 0; i < segs.length; i++) {
+      if (!fs.exists(segs[i])) {
+        if (LOG.isWarnEnabled()) {
+          LOG.warn("Input dir " + segs[i] + " doesn't exist, skipping.");
+        }
+        segs[i] = null;
+        continue;
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("SegmentMerger:   adding " + segs[i]);
+      }
+      Path cDir = new Path(segs[i], Content.DIR_NAME);
+      Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+      Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+      Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+      Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+      Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
+      c = c && fs.exists(cDir);
+      g = g && fs.exists(gDir);
+      f = f && fs.exists(fDir);
+      p = p && fs.exists(pDir);
+      pd = pd && fs.exists(pdDir);
+      pt = pt && fs.exists(ptDir);
+      
+      // Input changed?
+      if (g != pg || f != pf || p != pp || c != pc || pd != ppd || pt != ppt) {
+        LOG.info(segs[i] + " changed input dirs");
+      }
+      
+      pg = g; pf = f; pp = p; pc = c; ppd = pd; ppt = pt;
+    }
+    StringBuffer sb = new StringBuffer();
+    if (c)
+      sb.append(" " + Content.DIR_NAME);
+    if (g)
+      sb.append(" " + CrawlDatum.GENERATE_DIR_NAME);
+    if (f)
+      sb.append(" " + CrawlDatum.FETCH_DIR_NAME);
+    if (p)
+      sb.append(" " + CrawlDatum.PARSE_DIR_NAME);
+    if (pd)
+      sb.append(" " + ParseData.DIR_NAME);
+    if (pt)
+      sb.append(" " + ParseText.DIR_NAME);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("SegmentMerger: using segment data from:" + sb.toString());
+    }
+    for (int i = 0; i < segs.length; i++) {
+      if (segs[i] == null)
+        continue;
+      if (g) {
+        Path gDir = new Path(segs[i], CrawlDatum.GENERATE_DIR_NAME);
+        FileInputFormat.addInputPath(job, gDir);
+      }
+      if (c) {
+        Path cDir = new Path(segs[i], Content.DIR_NAME);
+        FileInputFormat.addInputPath(job, cDir);
+      }
+      if (f) {
+        Path fDir = new Path(segs[i], CrawlDatum.FETCH_DIR_NAME);
+        FileInputFormat.addInputPath(job, fDir);
+      }
+      if (p) {
+        Path pDir = new Path(segs[i], CrawlDatum.PARSE_DIR_NAME);
+        FileInputFormat.addInputPath(job, pDir);
+      }
+      if (pd) {
+        Path pdDir = new Path(segs[i], ParseData.DIR_NAME);
+        FileInputFormat.addInputPath(job, pdDir);
+      }
+      if (pt) {
+        Path ptDir = new Path(segs[i], ParseText.DIR_NAME);
+        FileInputFormat.addInputPath(job, ptDir);
+      }
+    }
+    job.setInputFormat(ObjectInputFormat.class);
+    job.setMapperClass(SegmentMerger.class);
+    job.setReducerClass(SegmentMerger.class);
+    FileOutputFormat.setOutputPath(job, out);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(MetaWrapper.class);
+    job.setOutputFormat(SegmentOutputFormat.class);
+
+    setConf(job);
+
+    JobClient.runJob(job);
+  }
+
+  /**
+   * @param args
+   */
+  public int run(String[] args)  throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("SegmentMerger output_dir (-dir segments | seg1 seg2 ...) [-filter] [-slice NNNN]");
+      System.err
+          .println("\toutput_dir\tname of the parent dir for output segment slice(s)");
+      System.err
+          .println("\t-dir segments\tparent dir containing several segments");
+      System.err.println("\tseg1 seg2 ...\tlist of segment dirs");
+      System.err
+          .println("\t-filter\t\tfilter out URL-s prohibited by current URLFilters");
+      System.err
+          .println("\t-normalize\t\tnormalize URL via current URLNormalizers");
+      System.err
+          .println("\t-slice NNNN\tcreate many output segments, each containing NNNN URLs");
+      return -1;
+    }
+    Configuration conf = NutchConfiguration.create();
+    final FileSystem fs = FileSystem.get(conf);
+    Path out = new Path(args[0]);
+    ArrayList<Path> segs = new ArrayList<Path>();
+    long sliceSize = 0;
+    boolean filter = false;
+    boolean normalize = false;
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-dir")) {
+        FileStatus[] fstats = fs.listStatus(new Path(args[++i]),
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
+        Path[] files = HadoopFSUtil.getPaths(fstats);
+        for (int j = 0; j < files.length; j++)
+          segs.add(files[j]);
+      } else if (args[i].equals("-filter")) {
+        filter = true;
+      } else if (args[i].equals("-normalize")) {
+        normalize = true;
+      } else if (args[i].equals("-slice")) {
+        sliceSize = Long.parseLong(args[++i]);
+      } else {
+        segs.add(new Path(args[i]));
+      }
+    }
+    if (segs.size() == 0) {
+      System.err.println("ERROR: No input segments.");
+      return -1;
+    }
+
+    merge(out, segs.toArray(new Path[segs.size()]), filter, normalize,
+        sliceSize);
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(),
+        new SegmentMerger(), args);
+    System.exit(result);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java
new file mode 100644
index 0000000..84247e4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentPart.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileSplit;
+
+/**
+ * Utility class for handling information about segment parts.
+ * 
+ * @author Andrzej Bialecki
+ */
+public class SegmentPart {
+  /** Name of the segment (just the last path component). */
+  public String segmentName;
+  /** Name of the segment part (ie. one of subdirectories inside a segment). */
+  public String partName;
+
+  public SegmentPart() {
+
+  }
+
+  public SegmentPart(String segmentName, String partName) {
+    this.segmentName = segmentName;
+    this.partName = partName;
+  }
+
+  /**
+   * Return a String representation of this class, in the form
+   * "segmentName/partName".
+   */
+  public String toString() {
+    return segmentName + "/" + partName;
+  }
+
+  /**
+   * Create SegmentPart from a FileSplit.
+   * 
+   * @param split
+   * @return A {@link SegmentPart} resultant from a {@link FileSplit}.
+   * @throws Exception
+   */
+  public static SegmentPart get(FileSplit split) throws IOException {
+    return get(split.getPath().toString());
+  }
+
+  /**
+   * Create SegmentPart from a full path of a location inside any segment part.
+   * 
+   * @param path
+   *          full path into a segment part (may include "part-xxxxx"
+   *          components)
+   * @return SegmentPart instance describing this part.
+   * @throws IOException
+   *           if any required path components are missing.
+   */
+  public static SegmentPart get(String path) throws IOException {
+    // find part name
+    String dir = path.replace('\\', '/');
+    int idx = dir.lastIndexOf("/part-");
+    if (idx == -1) {
+      throw new IOException("Cannot determine segment part: " + dir);
+    }
+    dir = dir.substring(0, idx);
+    idx = dir.lastIndexOf('/');
+    if (idx == -1) {
+      throw new IOException("Cannot determine segment part: " + dir);
+    }
+    String part = dir.substring(idx + 1);
+    // find segment name
+    dir = dir.substring(0, idx);
+    idx = dir.lastIndexOf('/');
+    if (idx == -1) {
+      throw new IOException("Cannot determine segment name: " + dir);
+    }
+    String segment = dir.substring(idx + 1);
+    return new SegmentPart(segment, part);
+  }
+
+  /**
+   * Create SegmentPart from a String in format "segmentName/partName".
+   * 
+   * @param string
+   *          input String
+   * @return parsed instance of SegmentPart
+   * @throws IOException
+   *           if "/" is missing.
+   */
+  public static SegmentPart parse(String string) throws IOException {
+    int idx = string.indexOf('/');
+    if (idx == -1) {
+      throw new IOException("Invalid SegmentPart: '" + string + "'");
+    }
+    String segment = string.substring(0, idx);
+    String part = string.substring(idx + 1);
+    return new SegmentPart(segment, part);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java
new file mode 100644
index 0000000..d00d1e2
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/SegmentReader.java
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.segment;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.Writer;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapFileOutputFormat;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.util.Progressable;
+import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.nutch.crawl.NutchWritable;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+import org.apache.nutch.protocol.Content;
+import org.apache.nutch.util.HadoopFSUtil;
+import org.apache.nutch.util.NutchConfiguration;
+import org.apache.nutch.util.NutchJob;
+
+/** Dump the content of a segment. */
+public class SegmentReader extends Configured implements
+    Reducer<Text, NutchWritable, Text, Text> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(SegmentReader.class);
+
+  long recNo = 0L;
+
+  private boolean co, fe, ge, pa, pd, pt;
+  private FileSystem fs;
+
+  public static class InputCompatMapper extends MapReduceBase implements
+      Mapper<WritableComparable<?>, Writable, Text, NutchWritable> {
+    private Text newKey = new Text();
+
+    public void map(WritableComparable<?> key, Writable value,
+        OutputCollector<Text, NutchWritable> collector, Reporter reporter)
+        throws IOException {
+      // convert on the fly from old formats with UTF8 keys.
+      // UTF8 deprecated and replaced by Text.
+      if (key instanceof Text) {
+        newKey.set(key.toString());
+        key = newKey;
+      }
+      collector.collect((Text) key, new NutchWritable(value));
+    }
+
+  }
+
+  /** Implements a text output format */
+  public static class TextOutputFormat extends
+      FileOutputFormat<WritableComparable<?>, Writable> {
+    public RecordWriter<WritableComparable<?>, Writable> getRecordWriter(
+        final FileSystem fs, JobConf job, String name,
+        final Progressable progress) throws IOException {
+
+      final Path segmentDumpFile = new Path(
+          FileOutputFormat.getOutputPath(job), name);
+
+      // Get the old copy out of the way
+      if (fs.exists(segmentDumpFile))
+        fs.delete(segmentDumpFile, true);
+
+      final PrintStream printStream = new PrintStream(
+          fs.create(segmentDumpFile));
+      return new RecordWriter<WritableComparable<?>, Writable>() {
+        public synchronized void write(WritableComparable<?> key, Writable value)
+            throws IOException {
+          printStream.println(value);
+        }
+
+        public synchronized void close(Reporter reporter) throws IOException {
+          printStream.close();
+        }
+      };
+    }
+  }
+
+  public SegmentReader() {
+    super(null);
+  }
+
+  public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge,
+      boolean pa, boolean pd, boolean pt) {
+    super(conf);
+    this.co = co;
+    this.fe = fe;
+    this.ge = ge;
+    this.pa = pa;
+    this.pd = pd;
+    this.pt = pt;
+    try {
+      this.fs = FileSystem.get(getConf());
+    } catch (IOException e) {
+      LOG.error("IOException:", e);
+    }
+  }
+
+  public void configure(JobConf job) {
+    setConf(job);
+    this.co = getConf().getBoolean("segment.reader.co", true);
+    this.fe = getConf().getBoolean("segment.reader.fe", true);
+    this.ge = getConf().getBoolean("segment.reader.ge", true);
+    this.pa = getConf().getBoolean("segment.reader.pa", true);
+    this.pd = getConf().getBoolean("segment.reader.pd", true);
+    this.pt = getConf().getBoolean("segment.reader.pt", true);
+    try {
+      this.fs = FileSystem.get(getConf());
+    } catch (IOException e) {
+      LOG.error("IOException:", e);
+    }
+  }
+
+  private JobConf createJobConf() {
+    JobConf job = new NutchJob(getConf());
+    job.setBoolean("segment.reader.co", this.co);
+    job.setBoolean("segment.reader.fe", this.fe);
+    job.setBoolean("segment.reader.ge", this.ge);
+    job.setBoolean("segment.reader.pa", this.pa);
+    job.setBoolean("segment.reader.pd", this.pd);
+    job.setBoolean("segment.reader.pt", this.pt);
+    return job;
+  }
+
+  public void close() {
+  }
+
+  public void reduce(Text key, Iterator<NutchWritable> values,
+      OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
+    StringBuffer dump = new StringBuffer();
+
+    dump.append("\nRecno:: ").append(recNo++).append("\n");
+    dump.append("URL:: " + key.toString() + "\n");
+    while (values.hasNext()) {
+      Writable value = values.next().get(); // unwrap
+      if (value instanceof CrawlDatum) {
+        dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());
+      } else if (value instanceof Content) {
+        dump.append("\nContent::\n").append(((Content) value).toString());
+      } else if (value instanceof ParseData) {
+        dump.append("\nParseData::\n").append(((ParseData) value).toString());
+      } else if (value instanceof ParseText) {
+        dump.append("\nParseText::\n").append(((ParseText) value).toString());
+      } else if (LOG.isWarnEnabled()) {
+        LOG.warn("Unrecognized type: " + value.getClass());
+      }
+    }
+    output.collect(key, new Text(dump.toString()));
+  }
+
+  public void dump(Path segment, Path output) throws IOException {
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("SegmentReader: dump segment: " + segment);
+    }
+
+    JobConf job = createJobConf();
+    job.setJobName("read " + segment);
+
+    if (ge)
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.GENERATE_DIR_NAME));
+    if (fe)
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.FETCH_DIR_NAME));
+    if (pa)
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.PARSE_DIR_NAME));
+    if (co)
+      FileInputFormat.addInputPath(job, new Path(segment, Content.DIR_NAME));
+    if (pd)
+      FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
+    if (pt)
+      FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
+
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setMapperClass(InputCompatMapper.class);
+    job.setReducerClass(SegmentReader.class);
+
+    Path tempDir = new Path(job.get("hadoop.tmp.dir", "/tmp") + "/segread-"
+        + new java.util.Random().nextInt());
+    fs.delete(tempDir, true);
+
+    FileOutputFormat.setOutputPath(job, tempDir);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NutchWritable.class);
+
+    JobClient.runJob(job);
+
+    // concatenate the output
+    Path dumpFile = new Path(output, job.get("segment.dump.dir", "dump"));
+
+    // remove the old file
+    fs.delete(dumpFile, true);
+    FileStatus[] fstats = fs.listStatus(tempDir,
+        HadoopFSUtil.getPassAllFilter());
+    Path[] files = HadoopFSUtil.getPaths(fstats);
+
+    PrintWriter writer = null;
+    int currentRecordNumber = 0;
+    if (files.length > 0) {
+      writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
+          fs.create(dumpFile))));
+      try {
+        for (int i = 0; i < files.length; i++) {
+          Path partFile = files[i];
+          try {
+            currentRecordNumber = append(fs, job, partFile, writer,
+                currentRecordNumber);
+          } catch (IOException exception) {
+            if (LOG.isWarnEnabled()) {
+              LOG.warn("Couldn't copy the content of " + partFile.toString()
+                  + " into " + dumpFile.toString());
+              LOG.warn(exception.getMessage());
+            }
+          }
+        }
+      } finally {
+        writer.close();
+      }
+    }
+    fs.delete(tempDir, true);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("SegmentReader: done");
+    }
+  }
+
+  /** Appends two files and updates the Recno counter */
+  private int append(FileSystem fs, Configuration conf, Path src,
+      PrintWriter writer, int currentRecordNumber) throws IOException {
+    BufferedReader reader = new BufferedReader(new InputStreamReader(
+        fs.open(src)));
+    try {
+      String line = reader.readLine();
+      while (line != null) {
+        if (line.startsWith("Recno:: ")) {
+          line = "Recno:: " + currentRecordNumber++;
+        }
+        writer.println(line);
+        line = reader.readLine();
+      }
+      return currentRecordNumber;
+    } finally {
+      reader.close();
+    }
+  }
+
+  private static final String[][] keys = new String[][] {
+      { "co", "Content::\n" }, { "ge", "Crawl Generate::\n" },
+      { "fe", "Crawl Fetch::\n" }, { "pa", "Crawl Parse::\n" },
+      { "pd", "ParseData::\n" }, { "pt", "ParseText::\n" } };
+
+  public void get(final Path segment, final Text key, Writer writer,
+      final Map<String, List<Writable>> results) throws Exception {
+    LOG.info("SegmentReader: get '" + key + "'");
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    if (co)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                Content.DIR_NAME), key);
+            results.put("co", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (fe)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                CrawlDatum.FETCH_DIR_NAME), key);
+            results.put("fe", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (ge)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getSeqRecords(new Path(segment,
+                CrawlDatum.GENERATE_DIR_NAME), key);
+            results.put("ge", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (pa)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getSeqRecords(new Path(segment,
+                CrawlDatum.PARSE_DIR_NAME), key);
+            results.put("pa", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (pd)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                ParseData.DIR_NAME), key);
+            results.put("pd", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    if (pt)
+      threads.add(new Thread() {
+        public void run() {
+          try {
+            List<Writable> res = getMapRecords(new Path(segment,
+                ParseText.DIR_NAME), key);
+            results.put("pt", res);
+          } catch (Exception e) {
+            LOG.error("Exception:", e);
+          }
+        }
+      });
+    Iterator<Thread> it = threads.iterator();
+    while (it.hasNext())
+      it.next().start();
+    int cnt;
+    do {
+      cnt = 0;
+      try {
+        Thread.sleep(5000);
+      } catch (Exception e) {
+      }
+      ;
+      it = threads.iterator();
+      while (it.hasNext()) {
+        if (it.next().isAlive())
+          cnt++;
+      }
+      if ((cnt > 0) && (LOG.isDebugEnabled())) {
+        LOG.debug("(" + cnt + " to retrieve)");
+      }
+    } while (cnt > 0);
+    for (int i = 0; i < keys.length; i++) {
+      List<Writable> res = results.get(keys[i][0]);
+      if (res != null && res.size() > 0) {
+        for (int k = 0; k < res.size(); k++) {
+          writer.write(keys[i][1]);
+          writer.write(res.get(k) + "\n");
+        }
+      }
+      writer.flush();
+    }
+  }
+
+  private List<Writable> getMapRecords(Path dir, Text key) throws Exception {
+    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir,
+        getConf());
+    ArrayList<Writable> res = new ArrayList<Writable>();
+    Class<?> keyClass = readers[0].getKeyClass();
+    Class<?> valueClass = readers[0].getValueClass();
+    if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
+      throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+    Writable value = (Writable) valueClass.newInstance();
+    // we don't know the partitioning schema
+    for (int i = 0; i < readers.length; i++) {
+      if (readers[i].get(key, value) != null) {
+        res.add(value);
+        value = (Writable) valueClass.newInstance();
+        Text aKey = (Text) keyClass.newInstance();
+        while (readers[i].next(aKey, value) && aKey.equals(key)) {
+          res.add(value);
+          value = (Writable) valueClass.newInstance();
+        }
+      }
+      readers[i].close();
+    }
+    return res;
+  }
+
+  private List<Writable> getSeqRecords(Path dir, Text key) throws Exception {
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(
+        getConf(), dir);
+    ArrayList<Writable> res = new ArrayList<Writable>();
+    Class<?> keyClass = readers[0].getKeyClass();
+    Class<?> valueClass = readers[0].getValueClass();
+    if (!keyClass.getName().equals("org.apache.hadoop.io.Text"))
+      throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+    Writable aKey = (Writable) keyClass.newInstance();
+    Writable value = (Writable) valueClass.newInstance();
+    for (int i = 0; i < readers.length; i++) {
+      while (readers[i].next(aKey, value)) {
+        if (aKey.equals(key)) {
+          res.add(value);
+          value = (Writable) valueClass.newInstance();
+        }
+      }
+      readers[i].close();
+    }
+    return res;
+  }
+
+  public static class SegmentReaderStats {
+    public long start = -1L;
+    public long end = -1L;
+    public long generated = -1L;
+    public long fetched = -1L;
+    public long fetchErrors = -1L;
+    public long parsed = -1L;
+    public long parseErrors = -1L;
+  }
+
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+
+  public void list(List<Path> dirs, Writer writer) throws Exception {
+    writer
+        .write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n");
+    for (int i = 0; i < dirs.size(); i++) {
+      Path dir = dirs.get(i);
+      SegmentReaderStats stats = new SegmentReaderStats();
+      getStats(dir, stats);
+      writer.write(dir.getName() + "\t");
+      if (stats.generated == -1)
+        writer.write("?");
+      else
+        writer.write(stats.generated + "");
+      writer.write("\t\t");
+      if (stats.start == -1)
+        writer.write("?\t");
+      else
+        writer.write(sdf.format(new Date(stats.start)));
+      writer.write("\t");
+      if (stats.end == -1)
+        writer.write("?");
+      else
+        writer.write(sdf.format(new Date(stats.end)));
+      writer.write("\t");
+      if (stats.fetched == -1)
+        writer.write("?");
+      else
+        writer.write(stats.fetched + "");
+      writer.write("\t");
+      if (stats.parsed == -1)
+        writer.write("?");
+      else
+        writer.write(stats.parsed + "");
+      writer.write("\n");
+      writer.flush();
+    }
+  }
+
+  public void getStats(Path segment, final SegmentReaderStats stats)
+      throws Exception {
+    long cnt = 0L;
+    Text key = new Text();
+    
+    if (ge) {
+      SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(
+          getConf(), new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+      for (int i = 0; i < readers.length; i++) {
+        while (readers[i].next(key))
+          cnt++;
+        readers[i].close();
+      }
+      stats.generated = cnt;
+    }
+    
+    if (fe) {
+      Path fetchDir = new Path(segment, CrawlDatum.FETCH_DIR_NAME);
+      if (fs.exists(fetchDir) && fs.getFileStatus(fetchDir).isDirectory()) {
+        cnt = 0L;
+        long start = Long.MAX_VALUE;
+        long end = Long.MIN_VALUE;
+        CrawlDatum value = new CrawlDatum();
+        MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir,
+            getConf());
+        for (int i = 0; i < mreaders.length; i++) {
+          while (mreaders[i].next(key, value)) {
+            cnt++;
+            if (value.getFetchTime() < start)
+              start = value.getFetchTime();
+            if (value.getFetchTime() > end)
+              end = value.getFetchTime();
+          }
+          mreaders[i].close();
+        }
+        stats.start = start;
+        stats.end = end;
+        stats.fetched = cnt;
+      }
+    }
+    
+    if (pd) {
+      Path parseDir = new Path(segment, ParseData.DIR_NAME);
+      if (fs.exists(parseDir) && fs.getFileStatus(parseDir).isDirectory()) {
+        cnt = 0L;
+        long errors = 0L;
+        ParseData value = new ParseData();
+        MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir,
+            getConf());
+        for (int i = 0; i < mreaders.length; i++) {
+          while (mreaders[i].next(key, value)) {
+            cnt++;
+            if (!value.getStatus().isSuccess())
+              errors++;
+          }
+          mreaders[i].close();
+        }
+        stats.parsed = cnt;
+        stats.parseErrors = errors;
+      }
+    }
+  }
+
+  private static final int MODE_DUMP = 0;
+
+  private static final int MODE_LIST = 1;
+
+  private static final int MODE_GET = 2;
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return;
+    }
+    int mode = -1;
+    if (args[0].equals("-dump"))
+      mode = MODE_DUMP;
+    else if (args[0].equals("-list"))
+      mode = MODE_LIST;
+    else if (args[0].equals("-get"))
+      mode = MODE_GET;
+
+    boolean co = true;
+    boolean fe = true;
+    boolean ge = true;
+    boolean pa = true;
+    boolean pd = true;
+    boolean pt = true;
+    // collect general options
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-nocontent")) {
+        co = false;
+        args[i] = null;
+      } else if (args[i].equals("-nofetch")) {
+        fe = false;
+        args[i] = null;
+      } else if (args[i].equals("-nogenerate")) {
+        ge = false;
+        args[i] = null;
+      } else if (args[i].equals("-noparse")) {
+        pa = false;
+        args[i] = null;
+      } else if (args[i].equals("-noparsedata")) {
+        pd = false;
+        args[i] = null;
+      } else if (args[i].equals("-noparsetext")) {
+        pt = false;
+        args[i] = null;
+      }
+    }
+    Configuration conf = NutchConfiguration.create();
+    final FileSystem fs = FileSystem.get(conf);
+    SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd,
+        pt);
+    // collect required args
+    switch (mode) {
+    case MODE_DUMP:
+      String input = args[1];
+      if (input == null) {
+        System.err.println("Missing required argument: <segment_dir>");
+        usage();
+        return;
+      }
+      String output = args.length > 2 ? args[2] : null;
+      if (output == null) {
+        System.err.println("Missing required argument: <output>");
+        usage();
+        return;
+      }
+      segmentReader.dump(new Path(input), new Path(output));
+      return;
+    case MODE_LIST:
+      ArrayList<Path> dirs = new ArrayList<Path>();
+      for (int i = 1; i < args.length; i++) {
+        if (args[i] == null)
+          continue;
+        if (args[i].equals("-dir")) {
+          Path dir = new Path(args[++i]);
+          FileStatus[] fstats = fs.listStatus(dir,
+              HadoopFSUtil.getPassDirectoriesFilter(fs));
+          Path[] files = HadoopFSUtil.getPaths(fstats);
+          if (files != null && files.length > 0) {
+            dirs.addAll(Arrays.asList(files));
+          }
+        } else
+          dirs.add(new Path(args[i]));
+      }
+      segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+      return;
+    case MODE_GET:
+      input = args[1];
+      if (input == null) {
+        System.err.println("Missing required argument: <segment_dir>");
+        usage();
+        return;
+      }
+      String key = args.length > 2 ? args[2] : null;
+      if (key == null) {
+        System.err.println("Missing required argument: <keyValue>");
+        usage();
+        return;
+      }
+      segmentReader.get(new Path(input), new Text(key), new OutputStreamWriter(
+          System.out, "UTF-8"), new HashMap<String, List<Writable>>());
+      return;
+    default:
+      System.err.println("Invalid operation: " + args[0]);
+      usage();
+      return;
+    }
+  }
+
+  private static void usage() {
+    System.err
+        .println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n");
+    System.err.println("* General options:");
+    System.err.println("\t-nocontent\tignore content directory");
+    System.err.println("\t-nofetch\tignore crawl_fetch directory");
+    System.err.println("\t-nogenerate\tignore crawl_generate directory");
+    System.err.println("\t-noparse\tignore crawl_parse directory");
+    System.err.println("\t-noparsedata\tignore parse_data directory");
+    System.err.println("\t-noparsetext\tignore parse_text directory");
+    System.err.println();
+    System.err
+        .println("* SegmentReader -dump <segment_dir> <output> [general options]");
+    System.err
+        .println("  Dumps content of a <segment_dir> as a text file to <output>.\n");
+    System.err.println("\t<segment_dir>\tname of the segment directory.");
+    System.err
+        .println("\t<output>\tname of the (non-existent) output directory.");
+    System.err.println();
+    System.err
+        .println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]");
+    System.err
+        .println("  List a synopsis of segments in specified directories, or all segments in");
+    System.err
+        .println("  a directory <segments>, and print it on System.out\n");
+    System.err
+        .println("\t<segment_dir1> ...\tlist of segment directories to process");
+    System.err
+        .println("\t-dir <segments>\t\tdirectory that contains multiple segments");
+    System.err.println();
+    System.err
+        .println("* SegmentReader -get <segment_dir> <keyValue> [general options]");
+    System.err
+        .println("  Get a specified record from a segment, and print it on System.out.\n");
+    System.err.println("\t<segment_dir>\tname of the segment directory.");
+    System.err.println("\t<keyValue>\tvalue of the key (url).");
+    System.err
+        .println("\t\tNote: put double-quotes around strings with spaces.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java b/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java
new file mode 100644
index 0000000..ecc0c26
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/segment/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A segment stores all data from on generate/fetch/update cycle:
+ * fetch list, protocol status, raw content, parsed content, and extracted outgoing links.
+ */
+package org.apache.nutch.segment;
+

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java b/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java
new file mode 100644
index 0000000..c71cfa9
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/ConfManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.model.request.NutchConfig;
+
+public interface ConfManager {
+
+  public Configuration get(String confId);
+
+  public Map<String, String> getAsMap(String confId);
+
+  public void setProperty(String confId, String propName, String propValue);
+
+  public Set<String> list();
+
+  public String create(NutchConfig nutchConfig);
+
+  public void delete(String confId);
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java b/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java
new file mode 100644
index 0000000..20346fc
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/JobManager.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service;
+
+import java.util.Collection;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+
+public interface JobManager {
+
+  public static enum JobType{
+    INJECT, GENERATE, FETCH, PARSE, UPDATEDB, INDEX, READDB, CLASS, INVERTLINKS, DEDUP
+  };
+  public Collection<JobInfo> list(String crawlId, State state);
+
+  public JobInfo get(String crawlId, String id);
+
+  /**
+   * Creates specified job
+   * @param jobConfig
+   * @return JobInfo
+   */
+  public JobInfo create(JobConfig jobConfig);
+
+  public boolean abort(String crawlId, String id);
+
+  public boolean stop(String crawlId, String id);
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java b/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java
new file mode 100644
index 0000000..00bb78f
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/NutchReader.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service;
+
+import java.io.FileNotFoundException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.impl.SequenceReader;
+import org.apache.nutch.util.NutchConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public interface  NutchReader {
+  
+  public static final Logger LOG = LoggerFactory.getLogger(NutchReader.class);
+  public static final Configuration conf = NutchConfiguration.create();
+  
+  public List read(String path) throws FileNotFoundException;
+  public List head(String path, int nrows) throws FileNotFoundException;
+  public List slice(String path, int start, int end) throws FileNotFoundException;
+  public int count(String path) throws FileNotFoundException;
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java b/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java
new file mode 100644
index 0000000..e206707
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/NutchServer.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
+
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.commons.cli.CommandLine;
+import org.apache.cxf.binding.BindingFactoryManager;
+import org.apache.cxf.jaxrs.JAXRSBindingFactory;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.ResourceProvider;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+import org.apache.nutch.fetcher.FetchNodeDb;
+import org.apache.nutch.service.impl.ConfManagerImpl;
+import org.apache.nutch.service.impl.JobFactory;
+import org.apache.nutch.service.impl.JobManagerImpl;
+import org.apache.nutch.service.impl.NutchServerPoolExecutor;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.service.resources.AdminResource;
+import org.apache.nutch.service.resources.ConfigResource;
+import org.apache.nutch.service.resources.DbResource;
+import org.apache.nutch.service.resources.JobResource;
+import org.apache.nutch.service.resources.ReaderResouce;
+import org.apache.nutch.service.resources.SeedResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Queues;
+
+public class NutchServer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(NutchServer.class);
+
+  private static final String LOCALHOST = "localhost";
+  private static final Integer DEFAULT_PORT = 8081;
+  private static final int JOB_CAPACITY = 100;
+
+  private static Integer port = DEFAULT_PORT;
+  private static String host  = LOCALHOST;
+
+  private static final String CMD_HELP = "help";
+  private static final String CMD_PORT = "port";
+  private static final String CMD_HOST = "host";
+
+  private long started;
+  private boolean running;
+  private ConfManager configManager;
+  private JobManager jobManager;
+  private JAXRSServerFactoryBean sf; 
+
+  private static FetchNodeDb fetchNodeDb;
+
+  private static NutchServer server;
+
+  static {
+    server = new NutchServer();
+  }
+
+  private NutchServer() {
+    configManager = new ConfManagerImpl();
+    BlockingQueue<Runnable> runnables = Queues.newArrayBlockingQueue(JOB_CAPACITY);
+    NutchServerPoolExecutor executor = new NutchServerPoolExecutor(10, JOB_CAPACITY, 1, TimeUnit.HOURS, runnables);
+    jobManager = new JobManagerImpl(new JobFactory(), configManager, executor);
+    fetchNodeDb = FetchNodeDb.getInstance();
+
+    sf = new JAXRSServerFactoryBean();
+    BindingFactoryManager manager = sf.getBus().getExtension(BindingFactoryManager.class);
+    JAXRSBindingFactory factory = new JAXRSBindingFactory();
+    factory.setBus(sf.getBus());
+    manager.registerBindingFactory(JAXRSBindingFactory.JAXRS_BINDING_ID, factory);
+    sf.setResourceClasses(getClasses());
+    sf.setResourceProviders(getResourceProviders());
+    sf.setProvider(new JacksonJaxbJsonProvider());
+
+  }
+
+  public static NutchServer getInstance() {
+    return server;
+  }
+
+  protected static void startServer() {
+    server.start();
+  }
+
+  private void start() {
+    LOG.info("Starting NutchServer on {}:{}  ...", host, port);
+    try{
+      String address = "http://" + host + ":" + port;
+      sf.setAddress(address);
+      sf.create();
+    }catch(Exception e){
+      throw new IllegalStateException("Server could not be started", e);
+    }
+
+    started = System.currentTimeMillis();
+    running = true;
+    LOG.info("Started Nutch Server on {}:{} at {}", new Object[] {host, port, started});
+  }
+
+  private List<Class<?>> getClasses() {
+    List<Class<?>> resources = new ArrayList<Class<?>>();
+    resources.add(JobResource.class);
+    resources.add(ConfigResource.class);
+    resources.add(DbResource.class);
+    resources.add(AdminResource.class);
+    resources.add(SeedResource.class);
+    resources.add(ReaderResouce.class);
+    return resources;
+  }
+
+  private List<ResourceProvider> getResourceProviders() {
+    List<ResourceProvider> resourceProviders = new ArrayList<ResourceProvider>();
+    resourceProviders.add(new SingletonResourceProvider(getConfManager()));
+    return resourceProviders;
+  }
+
+  public ConfManager getConfManager() {
+    return configManager;
+  }
+
+  public JobManager getJobManager() {
+    return jobManager;
+  }
+
+  public FetchNodeDb getFetchNodeDb(){
+    return fetchNodeDb;
+  }
+
+  public boolean isRunning(){
+    return running;
+  }
+
+  public long getStarted(){
+    return started;
+  }
+
+  public static void main(String[] args) throws ParseException {
+    CommandLineParser parser = new PosixParser();
+    Options options = createOptions();
+    CommandLine commandLine = parser.parse(options, args);
+    if (commandLine.hasOption(CMD_HELP)) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp("NutchServer", options, true);
+      return;
+    }
+
+    if (commandLine.hasOption(CMD_PORT)) {
+      port = Integer.parseInt(commandLine.getOptionValue(CMD_PORT));
+    }
+
+    if (commandLine.hasOption(CMD_HOST)) {
+      host = commandLine.getOptionValue(CMD_HOST);
+    }
+
+    startServer();
+  }
+
+  private static Options createOptions() {
+    Options options = new Options();
+
+    OptionBuilder.withDescription("Show this help");
+    options.addOption(OptionBuilder.create(CMD_HELP));
+
+    OptionBuilder.withArgName("port");
+    OptionBuilder.hasOptionalArg();
+    OptionBuilder.withDescription("The port to run the Nutch Server. Default port 8081");
+    options.addOption(OptionBuilder.create(CMD_PORT));
+
+    OptionBuilder.withArgName("host");
+    OptionBuilder.hasOptionalArg();
+    OptionBuilder.withDescription("The host to bind the Nutch Server to. Default is localhost.");
+    options.addOption(OptionBuilder.create(CMD_HOST));
+
+    return options;
+  }
+
+  public boolean canStop(boolean force){
+    if(force)
+      return true;
+
+    Collection<JobInfo> jobs = getJobManager().list(null, State.RUNNING);
+    return jobs.isEmpty();
+  }
+
+  protected static void setPort(int port) {
+	  NutchServer.port = port;
+  }
+  
+  public int getPort() {
+    return port;
+  }
+
+  public void stop() {
+    System.exit(0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java
new file mode 100644
index 0000000..0c08ce4
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/ConfManagerImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.ConfManager;
+import org.apache.nutch.service.model.request.NutchConfig;
+import org.apache.nutch.service.resources.ConfigResource;
+import org.apache.nutch.util.NutchConfiguration;
+
+import com.google.common.collect.Maps;
+
+public class ConfManagerImpl implements ConfManager {
+
+
+  private Map<String, Configuration> configurations = Maps.newConcurrentMap();
+
+  private AtomicInteger newConfigId = new AtomicInteger();
+
+  public ConfManagerImpl() {
+    configurations.put(ConfigResource.DEFAULT, NutchConfiguration.create());
+  }
+
+  /**
+   * Returns the configuration associatedConfManagerImpl with the given confId
+   */
+  public Configuration get(String confId) {
+    if (confId == null) {
+      return configurations.get(ConfigResource.DEFAULT);
+    }
+    return configurations.get(confId);
+  }
+
+  public Map<String, String> getAsMap(String confId) {
+    Configuration configuration = configurations.get(confId);
+    if (configuration == null) {
+      return Collections.emptyMap();
+    }
+
+    Iterator<Entry<String, String>> iterator = configuration.iterator();
+    Map<String, String> configMap = Maps.newTreeMap();
+    while (iterator.hasNext()) {
+      Entry<String, String> entry = iterator.next();
+      configMap.put(entry.getKey(), entry.getValue());
+    }
+    return configMap;
+  }
+
+  /**
+   * Sets the given property in the configuration associated with the confId
+   */
+  public void setProperty(String confId, String propName, String propValue) {
+    if (!configurations.containsKey(confId)) {
+      throw new IllegalArgumentException("Unknown configId '" + confId + "'");
+    }
+    Configuration conf = configurations.get(confId);
+    conf.set(propName, propValue);
+  }
+
+  public Set<String> list() {
+    return configurations.keySet();
+  }
+
+  /**
+   * Created a new configuration based on the values provided.
+   * @param NutchConfig
+   * @return String - confId
+   */
+  public String create(NutchConfig nutchConfig) {
+    if (StringUtils.isBlank(nutchConfig.getConfigId())) {
+      nutchConfig.setConfigId(String.valueOf(newConfigId.incrementAndGet()));
+    }
+
+    if (!canCreate(nutchConfig)) {
+      throw new IllegalArgumentException("Config already exists.");
+    }
+
+    createHadoopConfig(nutchConfig);
+    return nutchConfig.getConfigId();
+  }
+
+
+  public void delete(String confId) {
+    configurations.remove(confId);
+  }
+
+  private boolean canCreate(NutchConfig nutchConfig) {
+    if (nutchConfig.isForce()) {
+      return true;
+    }
+    if (!configurations.containsKey(nutchConfig.getConfigId())) {
+      return true;
+    }
+    return false;
+  }
+
+  private void createHadoopConfig(NutchConfig nutchConfig) {
+    Configuration conf = NutchConfiguration.create();
+    configurations.put(nutchConfig.getConfigId(), conf);
+
+    if (MapUtils.isEmpty(nutchConfig.getParams())) {
+      return;
+    }
+    for (Entry<String, String> e : nutchConfig.getParams().entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java
new file mode 100644
index 0000000..a74e362
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.service.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.nutch.service.JobManager.JobType;
+import org.apache.nutch.crawl.CrawlDb;
+import org.apache.nutch.crawl.DeduplicationJob;
+import org.apache.nutch.crawl.Generator;
+import org.apache.nutch.crawl.Injector;
+import org.apache.nutch.crawl.LinkDb;
+import org.apache.nutch.fetcher.Fetcher;
+import org.apache.nutch.indexer.IndexingJob;
+import org.apache.nutch.parse.ParseSegment;
+import org.apache.nutch.util.NutchTool;
+
+import com.google.common.collect.Maps;
+
+public class JobFactory {
+  private static Map<JobType, Class<? extends NutchTool>> typeToClass;
+
+  static {
+    typeToClass = Maps.newHashMap();
+    typeToClass.put(JobType.INJECT, Injector.class);
+    typeToClass.put(JobType.GENERATE, Generator.class);
+    typeToClass.put(JobType.FETCH, Fetcher.class);
+    typeToClass.put(JobType.PARSE, ParseSegment.class);
+    typeToClass.put(JobType.INDEX, IndexingJob.class);
+    typeToClass.put(JobType.UPDATEDB, CrawlDb.class);
+    typeToClass.put(JobType.INVERTLINKS, LinkDb.class);
+    typeToClass.put(JobType.DEDUP, DeduplicationJob.class);
+  }
+
+  public NutchTool createToolByType(JobType type, Configuration conf) {
+    if (!typeToClass.containsKey(type)) {
+      return null;
+    }
+    Class<? extends NutchTool> clz = typeToClass.get(type);
+    return createTool(clz, conf);
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public NutchTool createToolByClassName(String className, Configuration conf) {
+    try {
+      Class clz = Class.forName(className);
+      return createTool(clz, conf);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private NutchTool createTool(Class<? extends NutchTool> clz,
+      Configuration conf) {
+    return ReflectionUtils.newInstance(clz, conf);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nutch/blob/0bf453e5/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java
----------------------------------------------------------------------
diff --git a/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java
new file mode 100644
index 0000000..a915457
--- /dev/null
+++ b/nutch-core/src/main/java/org/apache/nutch/service/impl/JobManagerImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.service.impl;
+
+import java.util.Collection;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nutch.service.ConfManager;
+import org.apache.nutch.service.JobManager;
+import org.apache.nutch.service.model.request.JobConfig;
+import org.apache.nutch.service.model.response.JobInfo;
+import org.apache.nutch.service.model.response.JobInfo.State;
+import org.apache.nutch.util.NutchTool;
+
+public class JobManagerImpl implements JobManager {
+
+  private JobFactory jobFactory;
+  private NutchServerPoolExecutor executor;
+  private ConfManager configManager;
+
+  public JobManagerImpl(JobFactory jobFactory, ConfManager configManager, NutchServerPoolExecutor executor) {
+    this.jobFactory = jobFactory;
+    this.configManager = configManager;		
+    this.executor = executor;
+  }
+
+  @Override
+  public JobInfo create(JobConfig jobConfig) {
+    if (jobConfig.getArgs() == null) {
+      throw new IllegalArgumentException("Arguments cannot be null!");
+    }
+    Configuration conf = cloneConfiguration(jobConfig.getConfId());
+    NutchTool tool = createTool(jobConfig, conf);
+    JobWorker worker = new JobWorker(jobConfig, conf, tool);
+    executor.execute(worker);
+    executor.purge();		
+    return worker.getInfo();
+  }
+
+  private Configuration cloneConfiguration(String confId) {
+    Configuration conf = configManager.get(confId);
+    if (conf == null) {
+      throw new IllegalArgumentException("Unknown confId " + confId);
+    }
+    return new Configuration(conf);
+  }
+
+  @Override
+  public Collection<JobInfo> list(String crawlId, State state) {
+    if (state == null || state == State.ANY) {
+      return executor.getAllJobs();
+    }
+    if (state == State.RUNNING || state == State.IDLE) {
+      return executor.getJobRunning();
+    }
+    return executor.getJobHistory();
+  }
+
+  @Override
+  public JobInfo get(String crawlId, String jobId) {
+    return executor.getInfo(jobId);
+  }
+
+  @Override
+  public boolean abort(String crawlId, String id) {
+    return executor.findWorker(id).killJob();
+  }
+
+  @Override
+  public boolean stop(String crawlId, String id) {
+    return executor.findWorker(id).stopJob();
+  }
+
+  private NutchTool createTool(JobConfig jobConfig, Configuration conf){
+    if(StringUtils.isNotBlank(jobConfig.getJobClassName())){
+      return jobFactory.createToolByClassName(jobConfig.getJobClassName(), conf);
+    }
+    return jobFactory.createToolByType(jobConfig.getType(), conf);
+  }
+}