You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ab...@apache.org on 2006/04/12 01:36:46 UTC
svn commit: r393330 -
/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
Author: ab
Date: Tue Apr 11 16:36:44 2006
New Revision: 393330
URL: http://svn.apache.org/viewcvs?rev=393330&view=rev
Log:
Improved SegmentReader:
* fix breakage - couldn't write to already existing subdirectory. Now
output directory is specified in arguments.
* add functionality to retrieve individual records
* add functionality to list segment overviews
* add options to limit the reader's output to specified segment data.
Please see synopsis for more details.
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java?rev=393330&r1=393329&r2=393330&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/segment/SegmentReader.java Tue Apr 11 16:36:44 2006
@@ -16,130 +16,143 @@
package org.apache.nutch.segment;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintStream;
-import java.io.PrintWriter;
-import java.util.Iterator;
+import java.io.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
import java.util.logging.Logger;
-import org.apache.nutch.crawl.CrawlDatum;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.ObjectWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
-import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.parse.ParseText;
import org.apache.nutch.protocol.Content;
-import org.apache.hadoop.util.LogFormatter;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
import org.apache.nutch.util.NutchConfiguration;
-import org.apache.nutch.util.NutchJob;
/** Dump the content of a segment. */
public class SegmentReader extends Configured implements Reducer {
- public static final String DIR_NAME = "segdump";
-
- public static final Logger LOG =
- LogFormatter.getLogger(SegmentReader.class.getName());
+ public static final Logger LOG = LogFormatter.getLogger(SegmentReader.class.getName());
long recNo = 0L;
+
+ private boolean co, fe, ge, pa, pd, pt;
+ private FileSystem fs;
- /** Wraps inputs in an {@link ObjectWritable}, to permit merging different
- * types in reduce. */
+ /**
+ * Wraps inputs in an {@link ObjectWritable}, to permit merging different
+ * types in reduce.
+ */
public static class InputFormat extends SequenceFileInputFormat {
- public RecordReader getRecordReader(FileSystem fs, FileSplit split,
- JobConf job, Reporter reporter)
- throws IOException {
+ public RecordReader getRecordReader(FileSystem fs, FileSplit split, JobConf job, Reporter reporter)
+ throws IOException {
reporter.setStatus(split.toString());
return new SequenceFileRecordReader(job, split) {
- public synchronized boolean next(Writable key, Writable value)
- throws IOException {
- ObjectWritable wrapper = (ObjectWritable)value;
- try {
- wrapper.set(getValueClass().newInstance());
- } catch (Exception e) {
- throw new IOException(e.toString());
- }
- return super.next(key, (Writable)wrapper.get());
+ public synchronized boolean next(Writable key, Writable value) throws IOException {
+ ObjectWritable wrapper = (ObjectWritable) value;
+ try {
+ wrapper.set(getValueClass().newInstance());
+ } catch (Exception e) {
+ throw new IOException(e.toString());
}
- };
+ return super.next(key, (Writable) wrapper.get());
+ }
+ };
}
}
- /** Implements a text output format*/
- public static class TextOutputFormat
- extends org.apache.hadoop.mapred.OutputFormatBase {
- public RecordWriter getRecordWriter(final FileSystem fs, JobConf job,
- String name) throws IOException {
-
- final File segmentDumpFile =
- new File(new File(job.getOutputDir(), SegmentReader.DIR_NAME), name);
-
- // Get the old copy out of the way
- fs.delete(segmentDumpFile);
+ /** Implements a text output format */
+ public static class TextOutputFormat extends org.apache.hadoop.mapred.OutputFormatBase {
+ public RecordWriter getRecordWriter(final FileSystem fs, JobConf job, String name) throws IOException {
+
+ final File segmentDumpFile = new File(job.getOutputDir(), name);
+
+ // Get the old copy out of the way
+ fs.delete(segmentDumpFile);
+
+ final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
+ return new RecordWriter() {
+ public synchronized void write(WritableComparable key, Writable value) throws IOException {
+ ObjectWritable writable = (ObjectWritable) value;
+ printStream.println((String) writable.get());
+ }
- final PrintStream printStream = new PrintStream(fs.create(segmentDumpFile));
- return new RecordWriter() {
- public synchronized void write(WritableComparable key, Writable value)
- throws IOException {
- ObjectWritable writable = (ObjectWritable)value;
- printStream.println((String)writable.get());
- }
- public synchronized void close(Reporter reporter) throws IOException {
- printStream.close();
- }
- };
+ public synchronized void close(Reporter reporter) throws IOException {
+ printStream.close();
+ }
+ };
+ }
}
-}
- public SegmentReader() {
- super(null);
+ public SegmentReader() {
+ super(null);
}
-
- public SegmentReader(Configuration conf) {
+
+ public SegmentReader(Configuration conf, boolean co, boolean fe, boolean ge, boolean pa,
+ boolean pd, boolean pt) {
super(conf);
+ this.co = co;
+ this.fe = fe;
+ this.ge = ge;
+ this.pa = pa;
+ this.pd = pd;
+ this.pt = pt;
+ try {
+ this.fs = FileSystem.get(getConf());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
}
- public void configure(JobConf job) {}
+ public void configure(JobConf job) {
+ setConf(job);
+ this.co = getConf().getBoolean("segment.reader.co", true);
+ this.fe = getConf().getBoolean("segment.reader.fe", true);
+ this.ge = getConf().getBoolean("segment.reader.ge", true);
+ this.pa = getConf().getBoolean("segment.reader.pa", true);
+ this.pd = getConf().getBoolean("segment.reader.pd", true);
+ this.pt = getConf().getBoolean("segment.reader.pt", true);
+ try {
+ this.fs = FileSystem.get(getConf());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ private JobConf createJobConf() {
+ JobConf job = new JobConf(getConf());
+ job.setBoolean("segment.reader.co", this.co);
+ job.setBoolean("segment.reader.fe", this.fe);
+ job.setBoolean("segment.reader.ge", this.ge);
+ job.setBoolean("segment.reader.pa", this.pa);
+ job.setBoolean("segment.reader.pd", this.pd);
+ job.setBoolean("segment.reader.pt", this.pt);
+ return job;
+ }
+
public void close() {}
- public void reduce(WritableComparable key, Iterator values,
- OutputCollector output, Reporter reporter)
- throws IOException {
+ public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+ throws IOException {
StringBuffer dump = new StringBuffer();
-
+
dump.append("\nRecno:: ").append(recNo++).append("\n");
- dump.append("URL: " + key.toString() + "\n");
+ dump.append("URL:: " + key.toString() + "\n");
while (values.hasNext()) {
- Object value = ((ObjectWritable)values.next()).get(); // unwrap
+ Object value = ((ObjectWritable) values.next()).get(); // unwrap
if (value instanceof CrawlDatum) {
- dump.append("\nCrawlDatum::\n").append(((CrawlDatum)value).toString());
+ dump.append("\nCrawlDatum::\n").append(((CrawlDatum) value).toString());
} else if (value instanceof Content) {
- dump.append("\nContent::\n").append(((Content)value).toString());
+ dump.append("\nContent::\n").append(((Content) value).toString());
} else if (value instanceof ParseData) {
- dump.append("\nParseData::\n").append(((ParseData)value).toString());
+ dump.append("\nParseData::\n").append(((ParseData) value).toString());
} else if (value instanceof ParseText) {
- dump.append("\nParseText::\n").append(((ParseText)value).toString());
+ dump.append("\nParseText::\n").append(((ParseText) value).toString());
} else {
LOG.warning("Unrecognized type: " + value.getClass());
}
@@ -147,91 +160,433 @@
output.collect(key, new ObjectWritable(dump.toString()));
}
- public void reader(File segment) throws IOException {
- LOG.info("Reader: segment: " + segment);
+ public void dump(File segment, File output) throws IOException {
+ LOG.info("SegmentReader: dump segment: " + segment);
- JobConf job = new NutchJob(getConf());
+ JobConf job = createJobConf();
job.setJobName("read " + segment);
- job.addInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
- job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME));
- job.addInputDir(new File(segment, CrawlDatum.PARSE_DIR_NAME));
- job.addInputDir(new File(segment, Content.DIR_NAME));
- job.addInputDir(new File(segment, ParseData.DIR_NAME));
- job.addInputDir(new File(segment, ParseText.DIR_NAME));
+ if (ge) job.addInputDir(new File(segment, CrawlDatum.GENERATE_DIR_NAME));
+ if (fe) job.addInputDir(new File(segment, CrawlDatum.FETCH_DIR_NAME));
+ if (pa) job.addInputDir(new File(segment, CrawlDatum.PARSE_DIR_NAME));
+ if (co) job.addInputDir(new File(segment, Content.DIR_NAME));
+ if (pd) job.addInputDir(new File(segment, ParseData.DIR_NAME));
+ if (pt) job.addInputDir(new File(segment, ParseText.DIR_NAME));
job.setInputFormat(InputFormat.class);
job.setInputKeyClass(UTF8.class);
job.setInputValueClass(ObjectWritable.class);
job.setReducerClass(SegmentReader.class);
+
+ File tempDir = new File("/tmp/segread-" + new java.util.Random().nextInt());
+ fs.delete(tempDir);
- job.setOutputDir(segment);
+ job.setOutputDir(tempDir);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(UTF8.class);
job.setOutputValueClass(ObjectWritable.class);
JobClient.runJob(job);
-
+
// concatenate the output
- FileSystem fs = FileSystem.get(job);
- File directory = new File(job.getOutputDir(), SegmentReader.DIR_NAME);
- File dumpFile = new File(directory, job.get("segment.dump.dir", "dump"));
+ File dumpFile = new File(output, job.get("segment.dump.dir", "dump"));
- // remove the old file
+ // remove the old file
fs.delete(dumpFile);
- File[] files = fs.listFiles(directory);
-
+ File[] files = fs.listFiles(tempDir);
+
PrintWriter writer = null;
- int currentReccordNumber = 0;
+ int currentRecordNumber = 0;
if (files.length > 0) {
- writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile))));
- try {
- for (int i = 0 ; i < files.length; i++) {
- File partFile = (File)files[i];
- try {
- currentReccordNumber = append(fs, job, partFile, writer, currentReccordNumber);
- } catch (IOException exception) {
- LOG.warning("Couldn't copy the content of " + partFile.toString() + " into " + dumpFile.toString());
- LOG.warning(exception.getMessage());
- }
- }
+ writer = new PrintWriter(new BufferedWriter(new OutputStreamWriter(fs.create(dumpFile))));
+ try {
+ for (int i = 0; i < files.length; i++) {
+ File partFile = (File) files[i];
+ try {
+ currentRecordNumber = append(fs, job, partFile, writer, currentRecordNumber);
+ } catch (IOException exception) {
+ LOG.warning("Couldn't copy the content of " + partFile.toString() + " into " + dumpFile.toString());
+ LOG.warning(exception.getMessage());
+ }
}
- finally {
- writer.close();
+ } finally {
+ writer.close();
+ }
+ }
+ fs.delete(tempDir);
+ LOG.info("SegmentReader: done");
+ }
+
+ /** Appends two files and updates the Recno counter */
+ private int append(FileSystem fs, Configuration conf, File src, PrintWriter writer, int currentRecordNumber)
+ throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src)));
+ try {
+ String line = reader.readLine();
+ while (line != null) {
+ if (line.startsWith("Recno:: ")) {
+ line = "Recno:: " + currentRecordNumber++;
}
+ writer.println(line);
+ line = reader.readLine();
+ }
+ return currentRecordNumber;
+ } finally {
+ reader.close();
}
- LOG.info("Reader: done");
}
- /** Appends two files and updates the Recno counter*/
- private int append(FileSystem fs, Configuration conf, File src, PrintWriter writer, int currentReccordNumber) throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(src)));
+ private static final String[][] keys = new String[][] {
+ {"co", "Content::\n"},
+ {"ge", "Crawl Generate::\n"},
+ {"fe", "Crawl Fetch::\n"},
+ {"pa", "Crawl Parse::\n"},
+ {"pd", "ParseData::\n"},
+ {"pt", "ParseText::\n"}
+ };
+
+ public void get(final File segment, final UTF8 key, Writer writer,
+ final Map results) throws Exception {
+ LOG.info("SegmentReader: get '" + key + "'");
+ ArrayList threads = new ArrayList();
+ if (co) threads.add(new Thread() {
+ public void run() {
+ try {
+ List res = getMapRecords(new File(segment, Content.DIR_NAME), key);
+ results.put("co", res);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ if (fe) threads.add(new Thread() {
+ public void run() {
+ try {
+ List res = getMapRecords(new File(segment, CrawlDatum.FETCH_DIR_NAME), key);
+ results.put("fe", res);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ if (ge) threads.add(new Thread() {
+ public void run() {
+ try {
+ List res = getSeqRecords(new File(segment, CrawlDatum.GENERATE_DIR_NAME), key);
+ results.put("ge", res);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ if (pa) threads.add(new Thread() {
+ public void run() {
+ try {
+ List res = getSeqRecords(new File(segment, CrawlDatum.PARSE_DIR_NAME), key);
+ results.put("pa", res);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ if (pd) threads.add(new Thread() {
+ public void run() {
+ try {
+ List res = getMapRecords(new File(segment, ParseData.DIR_NAME), key);
+ results.put("pd", res);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ if (pt) threads.add(new Thread() {
+ public void run() {
+ try {
+ List res = getMapRecords(new File(segment, ParseText.DIR_NAME), key);
+ results.put("pt", res);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ Iterator it = threads.iterator();
+ while (it.hasNext()) ((Thread)it.next()).start();
+ int cnt = 0;
+ do {
try {
- String line = reader.readLine();
- while(line != null) {
- if (line.startsWith("Recno:: ")) {
- line = "Recno:: " + currentReccordNumber++;
- }
- writer.println(line);
- line = reader.readLine();
- }
- return currentReccordNumber;
- } finally {
- reader.close();
+ Thread.sleep(5000);
+ } catch (Exception e) {};
+ it = threads.iterator();
+ while (it.hasNext()) {
+ if (((Thread)it.next()).isAlive()) cnt++;
}
+ if (cnt > 0) System.err.println("(" + cnt + " to retrieve)");
+ } while (cnt > 0);
+ for (int i = 0; i < keys.length; i++) {
+ List res = (List)results.get(keys[i][0]);
+ if (res != null && res.size() > 0) {
+ for (int k = 0; k < res.size(); k++) {
+ writer.write(keys[i][1]);
+ writer.write(res.get(k) + "\n");
+ }
+ }
+ writer.flush();
+ }
}
- public static void main(String[] args) throws Exception {
- Configuration conf = NutchConfiguration.create();
- SegmentReader segmentReader = new SegmentReader(conf);
+ private List getMapRecords(File dir, UTF8 key) throws Exception {
+ MapFile.Reader[] readers = MapFileOutputFormat.getReaders(fs, dir, getConf());
+ ArrayList res = new ArrayList();
+ Class keyClass = readers[0].getKeyClass();
+ Class valueClass = readers[0].getValueClass();
+ if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8"))
+ throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+ Writable value = (Writable)valueClass.newInstance();
+ // we don't know the partitioning schema
+ for (int i = 0; i < readers.length; i++) {
+ if (readers[i].get(key, value) != null)
+ res.add(value);
+ readers[i].close();
+ }
+ return res;
+ }
- String usage = "Usage: SegmentReader <segment>";
+ private List getSeqRecords(File dir, UTF8 key) throws Exception {
+ SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), dir);
+ ArrayList res = new ArrayList();
+ Class keyClass = readers[0].getKeyClass();
+ Class valueClass = readers[0].getValueClass();
+ if (!keyClass.getName().equals("org.apache.hadoop.io.UTF8"))
+ throw new IOException("Incompatible key (" + keyClass.getName() + ")");
+ Writable aKey = (Writable)keyClass.newInstance();
+ Writable value = (Writable)valueClass.newInstance();
+ for (int i = 0; i < readers.length; i++) {
+ while (readers[i].next(aKey, value)) {
+ if (aKey.equals(key))
+ res.add(value);
+ }
+ readers[i].close();
+ }
+ return res;
+ }
- if (args.length == 0) {
- System.err.println(usage);
- System.exit(-1);
+ public static class SegmentReaderStats {
+ public long start = -1L;
+ public long end = -1L;
+ public long generated = -1L;
+ public long fetched = -1L;
+ public long fetchErrors = -1L;
+ public long parsed = -1L;
+ public long parseErrors = -1L;
+ }
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
+
+ public void list(List dirs, Writer writer) throws Exception {
+ writer.write("NAME\t\tGENERATED\tFETCHER START\t\tFETCHER END\t\tFETCHED\tPARSED\n");
+ for (int i = 0; i < dirs.size(); i++) {
+ File dir = (File)dirs.get(i);
+ SegmentReaderStats stats = new SegmentReaderStats();
+ getStats(dir, stats);
+ writer.write(dir.getName() + "\t");
+ if (stats.generated == -1) writer.write("?");
+ else writer.write(stats.generated + "");
+ writer.write("\t\t");
+ if (stats.start == -1) writer.write("?\t");
+ else writer.write(sdf.format(new Date(stats.start)));
+ writer.write("\t");
+ if (stats.end == -1) writer.write("?");
+ else writer.write(sdf.format(new Date(stats.end)));
+ writer.write("\t");
+ if (stats.fetched == -1) writer.write("?");
+ else writer.write(stats.fetched + "");
+ writer.write("\t");
+ if (stats.parsed == -1) writer.write("?");
+ else writer.write(stats.parsed + "");
+ writer.write("\n");
+ writer.flush();
+ }
+ }
+
+ public void getStats(File segment, final SegmentReaderStats stats) throws Exception {
+ SequenceFile.Reader[] readers = SequenceFileOutputFormat.getReaders(getConf(), new File(segment, CrawlDatum.GENERATE_DIR_NAME));
+ long cnt = 0L;
+ UTF8 key = new UTF8();
+ for (int i = 0; i < readers.length; i++) {
+ while (readers[i].next(key)) cnt++;
+ readers[i].close();
}
- segmentReader.reader(new File(args[0]));
+ stats.generated = cnt;
+ File fetchDir = new File(segment, CrawlDatum.FETCH_DIR_NAME);
+ if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) {
+ cnt = 0L;
+ long start = Long.MAX_VALUE;
+ long end = Long.MIN_VALUE;
+ CrawlDatum value = new CrawlDatum();
+ MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, fetchDir, getConf());
+ for (int i = 0; i < mreaders.length; i++) {
+ while (mreaders[i].next(key, value)) {
+ cnt++;
+ if (value.getFetchTime() < start) start = value.getFetchTime();
+ if (value.getFetchTime() > end) end = value.getFetchTime();
+ }
+ mreaders[i].close();
+ }
+ stats.start = start;
+ stats.end = end;
+ stats.fetched = cnt;
+ }
+ File parseDir = new File(segment, ParseData.DIR_NAME);
+ if (fs.exists(fetchDir) && fs.isDirectory(fetchDir)) {
+ cnt = 0L;
+ long errors = 0L;
+ ParseData value = new ParseData();
+ MapFile.Reader[] mreaders = MapFileOutputFormat.getReaders(fs, parseDir, getConf());
+ for (int i = 0; i < mreaders.length; i++) {
+ while (mreaders[i].next(key, value)) {
+ cnt++;
+ if (!value.getStatus().isSuccess()) errors++;
+ }
+ mreaders[i].close();
+ }
+ stats.parsed = cnt;
+ stats.parseErrors = errors;
+ }
+ }
+
+ private static final int MODE_DUMP = 0;
+
+ private static final int MODE_LIST = 1;
+
+ private static final int MODE_GET = 2;
+
+ public static void main(String[] args) throws Exception {
+ if (args.length < 2) {
+ usage();
+ return;
+ }
+ int mode = -1;
+ if (args[0].equals("-dump"))
+ mode = MODE_DUMP;
+ else if (args[0].equals("-list"))
+ mode = MODE_LIST;
+ else if (args[0].equals("-get")) mode = MODE_GET;
+
+ boolean co = true;
+ boolean fe = true;
+ boolean ge = true;
+ boolean pa = true;
+ boolean pd = true;
+ boolean pt = true;
+ // collect general options
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-nocontent")) {
+ co = false;
+ args[i] = null;
+ } else if (args[i].equals("-nofetch")) {
+ fe = false;
+ args[i] = null;
+ } else if (args[i].equals("-nogenerate")) {
+ ge = false;
+ args[i] = null;
+ } else if (args[i].equals("-noparse")) {
+ pa = false;
+ args[i] = null;
+ } else if (args[i].equals("-noparsedata")) {
+ pd = false;
+ args[i] = null;
+ } else if (args[i].equals("-noparsetext")) {
+ pt = false;
+ args[i] = null;
+ }
+ }
+ Configuration conf = NutchConfiguration.create();
+ FileSystem fs = FileSystem.get(conf);
+ SegmentReader segmentReader = new SegmentReader(conf, co, fe, ge, pa, pd, pt);
+ // collect required args
+ switch (mode) {
+ case MODE_DUMP:
+ String input = args[1];
+ if (input == null) {
+ System.err.println("Missing required argument: <segment_dir>");
+ usage();
+ return;
+ }
+ String output = args.length > 2 ? args[2] : null;
+ if (output == null) {
+ System.err.println("Missing required argument: <output>");
+ usage();
+ return;
+ }
+ segmentReader.dump(new File(input), new File(output));
+ return;
+ case MODE_LIST:
+ ArrayList dirs = new ArrayList();
+ for (int i = 1; i < args.length; i++) {
+ if (args[i] == null) continue;
+ if (args[i].equals("-dir")) {
+ File dir = new File(args[++i]);
+ File[] files = fs.listFiles(dir, new FileFilter() {
+ public boolean accept(File pathname) {
+ if (pathname.isDirectory()) return true;
+ return false;
+ }
+ });
+ if (files != null && files.length > 0) {
+ dirs.addAll(Arrays.asList(files));
+ }
+ } else dirs.add(new File(args[i]));
+ }
+ segmentReader.list(dirs, new OutputStreamWriter(System.out, "UTF-8"));
+ return;
+ case MODE_GET:
+ input = args[1];
+ if (input == null) {
+ System.err.println("Missing required argument: <segment_dir>");
+ usage();
+ return;
+ }
+ String key = args.length > 2 ? args[2] : null;
+ if (key == null) {
+ System.err.println("Missing required argument: <keyValue>");
+ usage();
+ return;
+ }
+ segmentReader.get(new File(input), new UTF8(key), new OutputStreamWriter(System.out, "UTF-8"), new HashMap());
+ return;
+ default:
+ System.err.println("Invalid operation: " + args[0]);
+ usage();
+ return;
+ }
+ }
+
+ private static void usage() {
+ System.err.println("Usage: SegmentReader (-dump ... | -list ... | -get ...) [general options]\n");
+ System.err.println("* General options:");
+ System.err.println("\t-nocontent\tignore content directory");
+ System.err.println("\t-nofetch\tignore crawl_fetch directory");
+ System.err.println("\t-nogenerate\tignore crawl_generate directory");
+ System.err.println("\t-noparse\tignore crawl_parse directory");
+ System.err.println("\t-noparsedata\tignore parse_data directory");
+ System.err.println("\t-noparsetext\tignore parse_text directory");
+ System.err.println();
+ System.err.println("* SegmentReader -dump <segment_dir> <output> [general options]");
+ System.err.println(" Dumps content of a <segment_dir> as a text file to <output>.\n");
+ System.err.println("\t<segment_dir>\tname of the segment directory.");
+ System.err.println("\t<output>\tname of the (non-existent) output directory.");
+ System.err.println();
+ System.err.println("* SegmentReader -list (<segment_dir1> ... | -dir <segments>) [general options]");
+ System.err.println(" List a synopsis of segments in specified directories, or all segments in");
+ System.err.println(" a directory <segments>, and print it on System.out\n");
+ System.err.println("\t<segment_dir1> ...\tlist of segment directories to process");
+ System.err.println("\t-dir <segments>\t\tdirectory that contains multiple segments");
+ System.err.println();
+ System.err.println("* SegmentReader -get <segment_dir> <keyValue> [general options]");
+ System.err.println(" Get a specified record from a segment, and print it on System.out.\n");
+ System.err.println("\t<segment_dir>\tname of the segment directory.");
+ System.err.println("\t<keyValue>\tvalue of the key (url).");
+ System.err.println("\t\tNote: put double-quotes around strings with spaces.");
}
}