You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/04/12 01:36:46 UTC

svn commit: r393330 - /lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java

Author: ab
Date: Tue Apr 11 16:36:44 2006
New Revision: 393330

URL: http://svn.apache.org/viewcvs?rev=393330&view=rev
Log:
Improved SegmentReader:

* fix breakage - couldn't write to already existing subdirectory. Now
  output directory is specified in arguments.

* add functionality to retrieve individual records

* add functionality to list segment overviews

* add options to limit the reader's output to specified segment data.

Please see synopsis for more details.

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?rev=393330&r1=393329&r2=393330&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Tue Apr 11 16:36:44 2006
@@ -16,130 +16,143 @@
 
 package org.apache.nutch.segment;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.Iterator;
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
 import java.util.logging.Logger;
 
-import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.parse.ParseData;
 import org.apache.nutch.parse.ParseText;
 import org.apache.nutch.protocol.Content;
-import org.apache.hadoop.util.LogFormatter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
 import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
 
 /** Dump the content of a segment. */
 public class SegmentReader extends Configured implements Reducer {
 
-  public static final String DIR_NAME = "segdump";
-
-  public static final Logger LOG =
-    LogFormatter.getLogger(SegmentReader.class.getName());
+  public static final Logger LOG = LogFormatter.getLogger(SegmentReader.class.getName());
 
   long recNo = 0L;
+  
+  private boolean co, fe, ge, pa, pd, pt;
+  private FileSystem fs;
 
-  /** Wraps inputs in an {@link ObjectWritable}, to permit merging different
-   * types in reduce. */
+  /**
+   * Wraps inputs in an {@link ObjectWritable}, to permit merging different
+   * types in reduce.
+   */
   public static class InputFormat extends SequenceFileInputFormat {
-    public RecordReader getRecordReader(FileSystem fs, FileSplit split,
-                                        JobConf job, Reporter reporter)
-      throws IOException {
+    public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
+            throws IOException {
       reporter.setStatus(split.toString());
 
       return new SequenceFileRecordReader(job, split) {
-          public synchronized boolean next(Writable key, Writable value)
-            throws IOException {
-            ObjectWritable wrapper = (ObjectWritable)value;
-            try {
-              wrapper.set(getValueClass().newInstance());
-            } catch (Exception e) {
-              throw new IOException(e.toString());
-            }
-            return super.next(key, (Writable)wrapper.get());
+        public synchronized boolean next(Writable key, Writable value) throws IOException {
+          ObjectWritable wrapper = (ObjectWritable) value;
+          try {
+            wrapper.set(getValueClass().newInstance());
+          } catch (Exception e) {
+            throw new IOException(e.toString());
           }
-        };
+          return super.next(key, (Writable) wrapper.get());
+        }
+      };
     }
   }
 
-  /** Implements a text output format*/
-  public static class TextOutputFormat
-  extends org.apache.hadoop.mapred.OutputFormatBase {
-  public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,
-                                      String name) throws IOException {
-
-   final File segmentDumpFile =
-     new File(new File(job.getOutputDir(), SegmentReader.DIR_NAME), name);
-
-   // Get the old copy out of the way
-   fs.delete(segmentDumpFile);
+  /** Implements a text output format */
+  public static class TextOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase {
+    public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, String name) throws IOException {
+
+      final File segmentDumpFile = new File(job.getOutputDir(), name);
+
+      // Get the old copy out of the way
+      fs.delete(segmentDumpFile);
+
+      final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
+      return new RecordWriter() {
+        public synchronized void write(WritableComparable key, Writable value) throws IOException {
+          ObjectWritable writable = (ObjectWritable) value;
+          printStream.println((String) writable.get());
+        }
 
-   final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
-   return new RecordWriter() {
-       public synchronized void write(WritableComparable key, Writable value)
-         throws IOException {
-         ObjectWritable writable = (ObjectWritable)value;
-         printStream.println((String)writable.get());
-       }
-       public synchronized void close(Reporter reporter) throws IOException {
-         printStream.close();
-       }
-     };
+        public synchronized void close(Reporter reporter) throws IOException {
+          printStream.close();
+        }
+      };
+    }
   }
-}
 
-  public SegmentReader() { 
-      super(null); 
+  public SegmentReader() {
+    super(null);
   }
-
-  public SegmentReader(Configuration conf) {
+  
+  public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, boolean pa,
+          boolean pd, boolean pt) {
     super(conf);
+    this.co = co;
+    this.fe = fe;
+    this.ge = ge;
+    this.pa = pa;
+    this.pd = pd;
+    this.pt = pt;
+    try {
+      this.fs = FileSystem.get(getConf());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
   }
 
-  public void configure(JobConf job) {}
+  public void configure(JobConf job) {
+    setConf(job);
+    this.co = getConf().getBoolean("segment.reader.co", true);
+    this.fe = getConf().getBoolean("segment.reader.fe", true);
+    this.ge = getConf().getBoolean("segment.reader.ge", true);
+    this.pa = getConf().getBoolean("segment.reader.pa", true);
+    this.pd = getConf().getBoolean("segment.reader.pd", true);
+    this.pt = getConf().getBoolean("segment.reader.pt", true);
+    try {
+      this.fs = FileSystem.get(getConf());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
 
+  private JobConf createJobConf() {
+    JobConf job = new JobConf(getConf());
+    job.setBoolean("segment.reader.co", this.co);
+    job.setBoolean("segment.reader.fe", this.fe);
+    job.setBoolean("segment.reader.ge", this.ge);
+    job.setBoolean("segment.reader.pa", this.pa);
+    job.setBoolean("segment.reader.pd", this.pd);
+    job.setBoolean("segment.reader.pt", this.pt);
+    return job;
+  }
+  
   public void close() {}
 
-  public void reduce(WritableComparable key, Iterator values,
-                     OutputCollector output, Reporter reporter)
-    throws IOException {
+  public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+          throws IOException {
     StringBuffer dump = new StringBuffer();
-    
+
     dump.append("\nRecno:: ").append(recNo++).append("\n");
-    dump.append("URL: " + key.toString() + "\n");
+    dump.append("URL:: " + key.toString() + "\n");
     while (values.hasNext()) {
-      Object value = ((ObjectWritable)values.next()).get(); // unwrap
+      Object value = ((ObjectWritable) values.next()).get(); // unwrap
       if (value instanceof CrawlDatum) {
-        dump.append("\nCrawlDatum::\n").append(((CrawlDatum)value).toString());  
+        dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());
       } else if (value instanceof Content) {
-          dump.append("\nContent::\n").append(((Content)value).toString());
+        dump.append("\nContent::\n").append(((Content) value).toString());
       } else if (value instanceof ParseData) {
-          dump.append("\nParseData::\n").append(((ParseData)value).toString());
+        dump.append("\nParseData::\n").append(((ParseData) value).toString());
       } else if (value instanceof ParseText) {
-          dump.append("\nParseText::\n").append(((ParseText)value).toString());
+        dump.append("\nParseText::\n").append(((ParseText) value).toString());
       } else {
         LOG.warning("Unrecognized type: " + value.getClass());
       }
@@ -147,91 +160,433 @@
     output.collect(key, new ObjectWritable(dump.toString()));
   }
 
-  public void reader(File segment) throws IOException {
-    LOG.info("Reader: segment: " + segment);
+  public void dump(File segment, File output) throws IOException {
+    LOG.info("SegmentReader: dump segment: " + segment);
 
-    JobConf job = new NutchJob(getConf());
+    JobConf job = createJobConf();
     job.setJobName("read " + segment);
 
-    job.addInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
-    job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME));
-    job.addInputDir(new File(segment, CrawlDatum.PARSE_DIR_NAME));
-    job.addInputDir(new File(segment, Content.DIR_NAME));
-    job.addInputDir(new File(segment, ParseData.DIR_NAME));
-    job.addInputDir(new File(segment, ParseText.DIR_NAME));
+    if (ge) job.addInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
+    if (fe) job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME));
+    if (pa) job.addInputDir(new File(segment, CrawlDatum.PARSE_DIR_NAME));
+    if (co) job.addInputDir(new File(segment, Content.DIR_NAME));
+    if (pd) job.addInputDir(new File(segment, ParseData.DIR_NAME));
+    if (pt) job.addInputDir(new File(segment, ParseText.DIR_NAME));
 
     job.setInputFormat(InputFormat.class);
     job.setInputKeyClass(UTF8.class);
     job.setInputValueClass(ObjectWritable.class);
 
     job.setReducerClass(SegmentReader.class);
+
+    File tempDir = new File("/tmp/segread-" + new java.util.Random().nextInt());
+    fs.delete(tempDir);
     
-    job.setOutputDir(segment);
+    job.setOutputDir(tempDir);
     job.setOutputFormat(TextOutputFormat.class);
     job.setOutputKeyClass(UTF8.class);
     job.setOutputValueClass(ObjectWritable.class);
 
     JobClient.runJob(job);
-    
+
     // concatenate the output
-    FileSystem fs = FileSystem.get(job);
-    File directory = new File(job.getOutputDir(), SegmentReader.DIR_NAME);
-    File dumpFile = new File(directory, job.get("segment.dump.dir", "dump"));
+    File dumpFile = new File(output, job.get("segment.dump.dir", "dump"));
 
-    // remove the old file 
+    // remove the old file
     fs.delete(dumpFile);
-    File[] files = fs.listFiles(directory);
-    
+    File[] files = fs.listFiles(tempDir);
+
     PrintWriter writer = null;
-    int currentReccordNumber = 0;
+    int currentRecordNumber = 0;
     if (files.length > 0) {
-        writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile))));
-        try {
-            for (int i = 0 ; i < files.length; i++) {
-                File partFile = (File)files[i];
-                try {
-                    currentReccordNumber = append(fs, job, partFile, writer, currentReccordNumber);
-                } catch (IOException exception) {
-                    LOG.warning("Couldn't copy the content of " + partFile.toString() + " into " + dumpFile.toString());
-                    LOG.warning(exception.getMessage());
-                }
-            }
+      writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile))));
+      try {
+        for (int i = 0; i < files.length; i++) {
+          File partFile = (File) files[i];
+          try {
+            currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber);
+          } catch (IOException exception) {
+            LOG.warning("Couldn't copy the content of " + partFile.toString() + " into " + dumpFile.toString());
+            LOG.warning(exception.getMessage());
+          }
         }
-        finally {
-            writer.close();
+      } finally {
+        writer.close();
+      }
+    }
+    fs.delete(tempDir);
+    LOG.info("SegmentReader: done");
+  }
+
+  /** Appends two files and updates the Recno counter */
+  private int append(FileSystem fs, Configuration conf, File src, PrintWriter writer, int currentRecordNumber)
+          throws IOException {
+    BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src)));
+    try {
+      String line = reader.readLine();
+      while (line != null) {
+        if (line.startsWith("Recno:: ")) {
+          line = "Recno:: " + currentRecordNumber++;
         }
+        writer.println(line);
+        line = reader.readLine();
+      }
+      return currentRecordNumber;
+    } finally {
+      reader.close();
     }
-    LOG.info("Reader: done");
   }
 
-  /** Appends two files and updates the Recno counter*/
-  private int append(FileSystem fs, Configuration conf, File src, PrintWriter writer, int currentReccordNumber) throws IOException {
-      BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src)));
+  private static final String[][] keys = new String[][] {
+          {"co", "Content::\n"},
+          {"ge", "Crawl Generate::\n"},
+          {"fe", "Crawl Fetch::\n"},
+          {"pa", "Crawl Parse::\n"},
+          {"pd", "ParseData::\n"},
+          {"pt", "ParseText::\n"}
+  };
+
+  public void get(final File segment, final UTF8 key, Writer writer,
+          final Map results) throws Exception {
+    LOG.info("SegmentReader: get '" + key + "'");
+    ArrayList threads = new ArrayList();
+    if (co) threads.add(new Thread() {
+      public void run() {
+        try {
+          List res = getMapRecords(new File(segment, Content.DIR_NAME), key);
+          results.put("co", res);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    if (fe) threads.add(new Thread() {
+      public void run() {
+        try {
+          List res = getMapRecords(new File(segment, CrawlDatum.FETCH_DIR_NAME), key);
+          results.put("fe", res);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    if (ge) threads.add(new Thread() {
+      public void run() {
+        try {
+          List res = getSeqRecords(new File(segment, CrawlDatum.GENERATE_DIR_NAME), key);
+          results.put("ge", res);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    if (pa) threads.add(new Thread() {
+      public void run() {
+        try {
+          List res = getSeqRecords(new File(segment, CrawlDatum.PARSE_DIR_NAME), key);
+          results.put("pa", res);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    if (pd) threads.add(new Thread() {
+      public void run() {
+        try {
+          List res = getMapRecords(new File(segment, ParseData.DIR_NAME), key);
+          results.put("pd", res);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    if (pt) threads.add(new Thread() {
+      public void run() {
+        try {
+          List res = getMapRecords(new File(segment, ParseText.DIR_NAME), key);
+          results.put("pt", res);
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    Iterator it = threads.iterator();
+    while (it.hasNext()) ((Thread)it.next()).start();
+    int cnt = 0;
+    do {
       try {
-          String line = reader.readLine();
-          while(line != null) {
-              if (line.startsWith("Recno:: ")) {
-                  line = "Recno:: " + currentReccordNumber++;
-              }
-              writer.println(line);
-              line = reader.readLine();
-          }
-          return currentReccordNumber;
-      } finally {
-          reader.close();
+        Thread.sleep(5000);
+      } catch (Exception e) {};
+      it = threads.iterator();
+      while (it.hasNext()) {
+        if (((Thread)it.next()).isAlive()) cnt++;
       }
+      if (cnt > 0) System.err.println("(" + cnt + " to retrieve)");
+    } while (cnt > 0);
+    for (int i = 0; i < keys.length; i++) {
+      List res = (List)results.get(keys[i][0]);
+      if (res != null && res.size() > 0) {
+        for (int k = 0; k < res.size(); k++) {
+          writer.write(keys[i][1]);
+          writer.write(res.get(k) + "\n");
+        }
+      }
+      writer.flush();
+    }
   }
   
-  public static void main(String[] args) throws Exception {
-    Configuration conf = NutchConfiguration.create();
-    SegmentReader segmentReader = new SegmentReader(conf);
+  private List getMapRecords(File dir, UTF8 key) throws Exception {
+    MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf());
+    ArrayList res = new ArrayList();
+    Class keyClass = readers[0].getKeyClass();
+    Class valueClass = readers[0].getValueClass();
+    if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8"))
+      throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+    Writable value = (Writable)valueClass.newInstance();
+    // we don't know the partitioning schema
+    for (int i = 0; i < readers.length; i++) {
+      if (readers[i].get(key, value) != null)
+        res.add(value);
+      readers[i].close();
+    }
+    return res;
+  }
 
-    String usage = "Usage: SegmentReader <segment>";
+  private List getSeqRecords(File dir, UTF8 key) throws Exception {
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), dir);
+    ArrayList res = new ArrayList();
+    Class keyClass = readers[0].getKeyClass();
+    Class valueClass = readers[0].getValueClass();
+    if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8"))
+      throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+    Writable aKey = (Writable)keyClass.newInstance();
+    Writable value = (Writable)valueClass.newInstance();
+    for (int i = 0; i < readers.length; i++) {
+      while (readers[i].next(aKey, value)) {
+        if (aKey.equals(key))
+          res.add(value);
+      }
+      readers[i].close();
+    }
+    return res;
+  }
 
-    if (args.length == 0) {
-      System.err.println(usage);
-      System.exit(-1);
+  public static class SegmentReaderStats {
+    public long start = -1L;
+    public long end = -1L;
+    public long generated = -1L;
+    public long fetched = -1L;
+    public long fetchErrors = -1L;
+    public long parsed = -1L;
+    public long parseErrors = -1L;
+  }
+  
+  SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+  
+  public void list(List dirs, Writer writer) throws Exception {
+    writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n");
+    for (int i = 0; i < dirs.size(); i++) {
+      File dir = (File)dirs.get(i);
+      SegmentReaderStats stats = new SegmentReaderStats();
+      getStats(dir, stats);
+      writer.write(dir.getName() + "\t");
+      if (stats.generated == -1) writer.write("?");
+      else writer.write(stats.generated + "");
+      writer.write("\t\t");
+      if (stats.start == -1) writer.write("?\t");
+      else writer.write(sdf.format(new Date(stats.start)));
+      writer.write("\t");
+      if (stats.end == -1) writer.write("?");
+      else writer.write(sdf.format(new Date(stats.end)));
+      writer.write("\t");
+      if (stats.fetched == -1) writer.write("?");
+      else writer.write(stats.fetched + "");
+      writer.write("\t");
+      if (stats.parsed == -1) writer.write("?");
+      else writer.write(stats.parsed + "");
+      writer.write("\n");
+      writer.flush();
+    }
+  }
+  
+  public void getStats(File segment, final SegmentReaderStats stats) throws Exception {
+    SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), new File(segment, CrawlDatum.GENERATE_DIR_NAME));
+    long cnt = 0L;
+    UTF8 key = new UTF8();
+    for (int i = 0; i < readers.length; i++) {
+      while (readers[i].next(key)) cnt++;
+      readers[i].close();
     }
-    segmentReader.reader(new File(args[0]));
+    stats.generated = cnt;
+    File fetchDir = new File(segment, CrawlDatum.FETCH_DIR_NAME);
+    if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) {
+      cnt = 0L;
+      long start = Long.MAX_VALUE;
+      long end = Long.MIN_VALUE;
+      CrawlDatum value = new CrawlDatum();
+      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, getConf());
+      for (int i = 0; i < mreaders.length; i++) {
+        while (mreaders[i].next(key, value)) {
+          cnt++;
+          if (value.getFetchTime() < start) start = value.getFetchTime();
+          if (value.getFetchTime() > end) end = value.getFetchTime();
+        }
+        mreaders[i].close();
+      }
+      stats.start = start;
+      stats.end = end;
+      stats.fetched = cnt;
+    }
+    File parseDir = new File(segment, ParseData.DIR_NAME);
+    if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) {
+      cnt = 0L;
+      long errors = 0L;
+      ParseData value = new ParseData();
+      MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, getConf());
+      for (int i = 0; i < mreaders.length; i++) {
+        while (mreaders[i].next(key, value)) {
+          cnt++;
+          if (!value.getStatus().isSuccess()) errors++;
+        }
+        mreaders[i].close();
+      }
+      stats.parsed = cnt;
+      stats.parseErrors = errors;
+    }
+  }
+  
+  private static final int MODE_DUMP = 0;
+
+  private static final int MODE_LIST = 1;
+
+  private static final int MODE_GET = 2;
+
+  public static void main(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return;
+    }
+    int mode = -1;
+    if (args[0].equals("-dump"))
+      mode = MODE_DUMP;
+    else if (args[0].equals("-list"))
+      mode = MODE_LIST;
+    else if (args[0].equals("-get")) mode = MODE_GET;
+
+    boolean co = true;
+    boolean fe = true;
+    boolean ge = true;
+    boolean pa = true;
+    boolean pd = true;
+    boolean pt = true;
+    // collect general options
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-nocontent")) {
+        co = false;
+        args[i] = null;
+      } else if (args[i].equals("-nofetch")) {
+        fe = false;
+        args[i] = null;
+      } else if (args[i].equals("-nogenerate")) {
+        ge = false;
+        args[i] = null;
+      } else if (args[i].equals("-noparse")) {
+        pa = false;
+        args[i] = null;
+      } else if (args[i].equals("-noparsedata")) {
+        pd = false;
+        args[i] = null;
+      } else if (args[i].equals("-noparsetext")) {
+        pt = false;
+        args[i] = null;
+      }
+    }
+    Configuration conf = NutchConfiguration.create();
+    FileSystem fs = FileSystem.get(conf);
+    SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, pt);
+    // collect required args
+    switch (mode) {
+      case MODE_DUMP:
+        String input = args[1];
+        if (input == null) {
+          System.err.println("Missing required argument: <segment_dir>");
+          usage();
+          return;
+        }
+        String output = args.length > 2 ? args[2] : null;
+        if (output == null) {
+          System.err.println("Missing required argument: <output>");
+          usage();
+          return;
+        }
+        segmentReader.dump(new File(input), new File(output));
+        return;
+      case MODE_LIST:
+        ArrayList dirs = new ArrayList();
+        for (int i = 1; i < args.length; i++) {
+          if (args[i] == null) continue;
+          if (args[i].equals("-dir")) {
+            File dir = new File(args[++i]);
+            File[] files = fs.listFiles(dir, new FileFilter() {
+              public boolean accept(File pathname) {
+                if (pathname.isDirectory()) return true;
+                return false;
+              }
+            });
+            if (files != null && files.length > 0) {
+              dirs.addAll(Arrays.asList(files));
+            }
+          } else dirs.add(new File(args[i]));
+        }
+        segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+        return;
+      case MODE_GET:
+        input = args[1];
+        if (input == null) {
+          System.err.println("Missing required argument: <segment_dir>");
+          usage();
+          return;
+        }
+        String key = args.length > 2 ? args[2] : null;
+        if (key == null) {
+          System.err.println("Missing required argument: <keyValue>");
+          usage();
+          return;
+        }
+        segmentReader.get(new File(input), new UTF8(key), new OutputStreamWriter(System.out, "UTF-8"), new HashMap());
+        return;
+      default:
+        System.err.println("Invalid operation: " + args[0]);
+        usage();
+        return;
+    }
+  }
+
+  private static void usage() {
+    System.err.println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n");
+    System.err.println("* General options:");
+    System.err.println("\t-nocontent\tignore content directory");
+    System.err.println("\t-nofetch\tignore crawl_fetch directory");
+    System.err.println("\t-nogenerate\tignore crawl_generate directory");
+    System.err.println("\t-noparse\tignore crawl_parse directory");
+    System.err.println("\t-noparsedata\tignore parse_data directory");
+    System.err.println("\t-noparsetext\tignore parse_text directory");
+    System.err.println();
+    System.err.println("* SegmentReader -dump <segment_dir> <output> [general options]");
+    System.err.println("  Dumps content of a <segment_dir> as a text file to <output>.\n");
+    System.err.println("\t<segment_dir>\tname of the segment directory.");
+    System.err.println("\t<output>\tname of the (non-existent) output directory.");
+    System.err.println();
+    System.err.println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]");
+    System.err.println("  List a synopsis of segments in specified directories, or all segments in");
+    System.err.println("  a directory <segments>, and print it on System.out\n");
+    System.err.println("\t<segment_dir1> ...\tlist of segment directories to process");
+    System.err.println("\t-dir <segments>\t\tdirectory that contains multiple segments");
+    System.err.println();
+    System.err.println("* SegmentReader -get <segment_dir> <keyValue> [general options]");
+    System.err.println("  Get a specified record from a segment, and print it on System.out.\n");
+    System.err.println("\t<segment_dir>\tname of the segment directory.");
+    System.err.println("\t<keyValue>\tvalue of the key (url).");
+    System.err.println("\t\tNote: put double-quotes around strings with spaces.");
   }
 }