You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/05/03 23:05:13 UTC
svn commit: r167994 -
/incubator/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Author: cutting
Date: Tue May 3 14:05:12 2005
New Revision: 167994
URL: http://svn.apache.org/viewcvs?rev=167994&view=rev
Log:
Rewrite distributed search to use RPC.
Modified:
incubator/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Modified: incubator/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
URL: http://svn.apache.org/viewcvs/incubator/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=167994&r1=167993&r2=167994&view=diff
==============================================================================
--- incubator/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original)
+++ incubator/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue May 3 14:05:12 2005
@@ -20,11 +20,13 @@
import java.io.*;
import java.util.*;
import java.util.logging.Logger;
+import java.lang.reflect.Method;
import org.apache.nutch.parse.ParseData;
import org.apache.nutch.parse.ParseText;
import org.apache.nutch.util.LogFormatter;
import org.apache.nutch.io.*;
+import org.apache.nutch.ipc.RPC;
/** Implements the search API over IPC connnections. */
public class DistributedSearch {
@@ -33,231 +35,18 @@
private DistributedSearch() {} // no public ctor
- // op codes for IPC calls
- private static final byte OP_SEGMENTS = (byte)0;
- private static final byte OP_SEARCH = (byte)1;
- private static final byte OP_EXPLAIN = (byte)2;
- private static final byte OP_DETAILS = (byte)3;
- private static final byte OP_SUMMARY = (byte)4;
- private static final byte OP_CONTENT = (byte)5;
- private static final byte OP_ANCHORS = (byte)6;
- private static final byte OP_PARSEDATA = (byte)7;
- private static final byte OP_PARSETEXT = (byte)8;
- private static final byte OP_FETCHDATE = (byte)9;
-
- /** Names of the op codes. */
- private static final String[] OP_NAMES = new String[10];
- static {
- OP_NAMES[OP_SEGMENTS] = "getSegmentNames";
- OP_NAMES[OP_SEARCH] = "search";
- OP_NAMES[OP_EXPLAIN] = "getExplanation";
- OP_NAMES[OP_DETAILS] = "getDetails";
- OP_NAMES[OP_SUMMARY] = "getSummary";
- OP_NAMES[OP_CONTENT] = "getContent";
- OP_NAMES[OP_ANCHORS] = "getAnchors";
- OP_NAMES[OP_PARSEDATA] = "getParseData";
- OP_NAMES[OP_PARSETEXT] = "getParseText";
- OP_NAMES[OP_FETCHDATE] = "getFetchDate";
- }
-
- /** The parameter passed with IPC requests. Public only so that {@link
- * Server} can construct instances. */
- public static class Param implements Writable {
- private byte op; // the op code
- private Writable first; // the first operand
- private Writable second; // the second operand
-
- public Param() {}
-
- Param(byte op, Writable first) {
- this(op, first, NullWritable.get());
- }
-
- Param(byte op, Writable first, Writable second) {
- this.op = op;
- this.first = first;
- this.second = second;
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeByte(op);
- first.write(out);
- second.write(out);
- }
-
- public void readFields(DataInput in) throws IOException {
- op = in.readByte();
-
- switch (op) {
- case OP_SEGMENTS:
- first = NullWritable.get();
- second = NullWritable.get();
- break;
- case OP_SEARCH:
- first = new Query();
- second = new IntWritable();
- break;
- case OP_EXPLAIN:
- first = new Query();
- second = new Hit();
- break;
- case OP_DETAILS:
- first = new Hit();
- second = NullWritable.get();
- break;
- case OP_SUMMARY:
- first = new HitDetails();
- second = new Query();
- break;
- case OP_CONTENT:
- case OP_ANCHORS:
- case OP_PARSEDATA:
- case OP_PARSETEXT:
- case OP_FETCHDATE:
- first = new HitDetails();
- second = NullWritable.get();
- break;
- default:
- throw new RuntimeException("Unknown op code: " + op);
- }
-
- first.readFields(in);
- second.readFields(in);
- }
- }
-
- /** The parameter returned with IPC responses. Public only so that {@link
- * Client} can construct instances. */
- public static class Result implements Writable {
- private byte op;
- private Writable value;
-
- public Result() {}
-
- Result(byte op, Writable value) {
- this.op = op;
- this.value = value;
- }
-
- public void write(DataOutput out) throws IOException {
- out.writeByte(op);
- value.write(out);
- }
-
- public void readFields(DataInput in) throws IOException {
- op = in.readByte();
-
- switch (op) {
- case OP_SEGMENTS:
- value = new ArrayWritable(UTF8.class);
- break;
- case OP_SEARCH:
- value = new Hits();
- break;
- case OP_EXPLAIN:
- value = new UTF8();
- break;
- case OP_DETAILS:
- value = new HitDetails();
- break;
- case OP_SUMMARY:
- value = new UTF8();
- break;
- case OP_CONTENT:
- value = new BytesWritable();
- break;
- case OP_ANCHORS:
- value = new ArrayWritable(UTF8.class);
- break;
- case OP_PARSEDATA:
- value = new ParseData();
- break;
- case OP_PARSETEXT:
- value = new ParseText();
- break;
- case OP_FETCHDATE:
- value = new LongWritable();
- break;
- default:
- throw new RuntimeException("Unknown op code: " + op);
- }
+ /** The distributed search protocol. */
+ public interface Protocol
+ extends Searcher, HitDetailer, HitSummarizer, HitContent {
- value.readFields(in);
- }
+ /** The name of the segments searched by this node. */
+ String[] getSegmentNames();
}
/** The search server. */
- public static class Server extends org.apache.nutch.ipc.Server {
- private NutchBean bean;
-
- /** Construct a search server on the index and segments in the named
- * directory, listening on the named port. */
- public Server(File directory, int port) throws IOException {
- super(port, Param.class, 10);
- this.bean = new NutchBean(directory);
- }
-
- public Writable call(Writable param) throws IOException {
- Param p = (Param)param;
- logRequest(p);
- Writable value;
- switch (p.op) {
- case OP_SEGMENTS:
- value = new ArrayWritable(bean.getSegmentNames());
- break;
- case OP_SEARCH:
- value = bean.search((Query)p.first, ((IntWritable)p.second).get());
- break;
- case OP_EXPLAIN:
- value = new UTF8(bean.getExplanation((Query)p.first, (Hit)p.second));
- break;
- case OP_DETAILS:
- value = bean.getDetails((Hit)p.first);
- break;
- case OP_SUMMARY:
- value = new UTF8(bean.getSummary((HitDetails)p.first,(Query)p.second));
- break;
- case OP_CONTENT:
- value = new BytesWritable(bean.getContent((HitDetails)p.first));
- break;
- case OP_ANCHORS:
- value = new ArrayWritable(bean.getAnchors((HitDetails)p.first));
- break;
- case OP_PARSEDATA:
- value = bean.getParseData((HitDetails)p.first);
- break;
- case OP_PARSETEXT:
- value = bean.getParseText((HitDetails)p.first);
- break;
- case OP_FETCHDATE:
- value = new LongWritable(bean.getFetchDate((HitDetails)p.first));
- break;
- default:
- throw new RuntimeException("Unknown op code: " + p.op);
- }
-
- //LOG.info("Result: "+value);
+ public static class Server {
- return new Result(p.op, value);
-
- }
-
- private static void logRequest(Param p) {
- StringBuffer buffer = new StringBuffer();
- buffer.append(Thread.currentThread().getName());
- buffer.append(": ");
- buffer.append(OP_NAMES[p.op]);
- buffer.append("(");
- if (p.first != NullWritable.get()) {
- buffer.append(p.first);
- if (p.second != NullWritable.get()) {
- buffer.append(", ");
- buffer.append(p.second);
- }
- }
- buffer.append(")");
- LOG.info(buffer.toString());
- }
+ private Server() {}
/** Runs a search server. */
public static void main(String[] args) throws Exception {
@@ -271,8 +60,9 @@
int port = Integer.parseInt(args[0]);
File directory = new File(args[1]);
- Server server = new Server(directory, port);
- //server.setTimeout(Integer.MAX_VALUE);
+ NutchBean bean = new NutchBean(directory);
+
+ org.apache.nutch.ipc.Server server = RPC.getServer(bean, port, 10, true);
server.start();
server.join();
}
@@ -280,22 +70,14 @@
}
/** The search client. */
- public static class Client extends org.apache.nutch.ipc.Client
+ public static class Client extends Thread
implements Searcher, HitDetailer, HitSummarizer, HitContent, Runnable {
- private InetSocketAddress[] addresses=new InetSocketAddress[0];
- private InetSocketAddress[] defaultaddresses;
+ private InetSocketAddress[] defaultAddresses;
+ private InetSocketAddress[] liveAddresses;
private HashMap segmentToAddress = new HashMap();
- /**
- * Flag for watchdog, true=keep running, false=stop
- */
- private boolean shouldrun=true;
-
- /**
- * Backgroudthread that polls search servers.
- */
- private Thread watchdog;
+ private boolean running = true;
/** Construct a client talking to servers listed in the named file.
* Each line in the file lists a server hostname and port, separated by
@@ -328,80 +110,99 @@
/** Construct a client talking to the named servers. */
public Client(InetSocketAddress[] addresses) throws IOException {
- super(Result.class);
- this.defaultaddresses = addresses;
- watchdog=new Thread(this);
- watchdog.start();
+ this.defaultAddresses = addresses;
+ updateSegments();
+ setDaemon(true);
+ start();
}
- /** Updates segments
+ private static final Method GET_SEGMENTS;
+ private static final Method SEARCH;
+ private static final Method DETAILS;
+ private static final Method SUMMARY;
+ static {
+ try {
+ GET_SEGMENTS = Protocol.class.getMethod
+ ("getSegmentNames", new Class[] {});
+ SEARCH = Protocol.class.getMethod
+ ("search", new Class[] { Query.class, Integer.TYPE});
+ DETAILS = Protocol.class.getMethod
+ ("getDetails", new Class[] { Hit.class});
+ SUMMARY = Protocol.class.getMethod
+ ("getSummary", new Class[] { HitDetails.class, Query.class});
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ /** Updates segment names.
*
* @throws IOException
*/
public void updateSegments() throws IOException {
- int statServers=0;
- int statSegments=0;
- Vector aliveaddresses=new Vector();
+ int liveServers=0;
+ int liveSegments=0;
+ Vector liveAddresses=new Vector();
// build segmentToAddress map
- Param param = new Param(OP_SEGMENTS, NullWritable.get());
- Writable[] params = new Writable[defaultaddresses.length];
- for (int i = 0; i < params.length; i++) {
- params[i] = param; // build param for parallel call
- }
- Writable[] results = call(params, defaultaddresses); // make parallel call
+ Object[][] params = new Object[defaultAddresses.length][0];
+ String[][] results =
+ (String[][])RPC.call(GET_SEGMENTS, params, defaultAddresses);
for (int i = 0; i < results.length; i++) { // process results of call
- Result result = (Result)results[i];
- if (result == null) {
- LOG.warning("Client: no segments from: " + defaultaddresses[i]);
+ InetSocketAddress addr = defaultAddresses[i];
+ String[] segments = results[i];
+ if (segments == null) {
+ LOG.warning("Client: no segments from: " + addr);
continue;
}
- String[] segments = ((ArrayWritable)result.value).toStrings();
for (int j = 0; j < segments.length; j++) {
- LOG.info("Client: segment "+segments[j]+" at "+defaultaddresses[i]);
- segmentToAddress.put(segments[j], defaultaddresses[i]);
- aliveaddresses.add(defaultaddresses[i]);
+ LOG.info("Client: segment "+segments[j]+" at "+addr);
+ segmentToAddress.put(segments[j], addr);
}
- statServers++;
- statSegments+=segments.length;
+ liveAddresses.add(addr);
+ liveServers++;
+ liveSegments+=segments.length;
}
- addresses=(InetSocketAddress[])aliveaddresses.toArray(new InetSocketAddress[statServers]);
- LOG.info("STATS: " + statServers + " servers / " + statSegments + " segments online.");
+ this.liveAddresses = (InetSocketAddress[]) // update liveAddresses
+ liveAddresses.toArray(new InetSocketAddress[liveAddresses.size()]);
+
+ LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments.");
}
/** Return the names of segments searched. */
public String[] getSegmentNames() {
- return (String[])segmentToAddress.keySet().toArray(new String[segmentToAddress.size()]);
+ return (String[])
+ segmentToAddress.keySet().toArray(new String[segmentToAddress.size()]);
}
public Hits search(Query query, int numHits) throws IOException {
long totalHits = 0;
- Hits[] segmentHits = new Hits[addresses.length];
+ Hits[] segmentHits = new Hits[liveAddresses.length];
- Param param = new Param(OP_SEARCH, query, new IntWritable(numHits));
- Writable[] params = new Writable[addresses.length];
+ Object[][] params = new Object[liveAddresses.length][2];
for (int i = 0; i < params.length; i++) {
- params[i] = param; // build param for parallel call
+ params[i][0] = query;
+ params[i][1] = new Integer(numHits);
}
- Writable[] results = call(params, addresses); // make parallel call
+ Hits[] results = (Hits[])RPC.call(SEARCH, params, liveAddresses);
TreeSet queue = new TreeSet(); // cull top hits from results
- float minScore = 0.0f;
+ Comparable minValue = null;
for (int i = 0; i < results.length; i++) {
- Result result = (Result)results[i];
- if (result == null) continue;
- Hits hits = (Hits)result.value;
+ Hits hits = results[i];
+ if (hits == null) continue;
totalHits += hits.getTotal();
for (int j = 0; j < hits.getLength(); j++) {
Hit h = hits.getHit(j);
- if (h.getScore() >= minScore) {
- queue.add(new Hit(i, h.getIndexDocNo(),h.getScore(),h.getSite()));
+ if (minValue == null || h.getSite().compareTo(minValue) >= 0) {
+ queue.add(new Hit(i, h.getIndexDocNo(), h.getSite()));
if (queue.size() > numHits) { // if hit queue overfull
queue.remove(queue.last()); // remove lowest in hit queue
- minScore = ((Hit)queue.last()).getScore(); // reset minScore
+ minValue = ((Hit)queue.last()).getSite(); // reset minValue
}
}
}
@@ -409,99 +210,72 @@
return new Hits(totalHits, (Hit[])queue.toArray(new Hit[queue.size()]));
}
+ private Protocol getRemote(Hit hit) {
+ return (Protocol)
+ RPC.getProxy(Protocol.class, liveAddresses[hit.getIndexNo()]);
+ }
+
+ private Protocol getRemote(HitDetails hit) {
+ InetSocketAddress address =
+ (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
+ return (Protocol)RPC.getProxy(Protocol.class, address);
+ }
+
public String getExplanation(Query query, Hit hit) throws IOException {
- Param param = new Param(OP_EXPLAIN, query, hit);
- Result result = (Result)call(param, addresses[hit.getIndexNo()]);
- return result.value.toString();
+ return getRemote(hit).getExplanation(query, hit);
}
public HitDetails getDetails(Hit hit) throws IOException {
- Param param = new Param(OP_DETAILS, hit);
- Result result = (Result)call(param, addresses[hit.getIndexNo()]);
- return (HitDetails)result.value;
+ return getRemote(hit).getDetails(hit);
}
public HitDetails[] getDetails(Hit[] hits) throws IOException {
- Writable[] params = new Writable[hits.length];
InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
+ Object[][] params = new Object[hits.length][1];
for (int i = 0; i < hits.length; i++) {
- params[i] = new Param(OP_DETAILS, hits[i]);
- addrs[i] = addresses[hits[i].getIndexNo()];
- }
- Writable[] writables = call(params, addrs);
- HitDetails[] results = new HitDetails[writables.length];
- for (int i = 0; i < results.length; i++) {
- results[i] = (HitDetails)((Result)writables[i]).value;
+ addrs[i] = liveAddresses[hits[i].getIndexNo()];
+ params[i][0] = hits[i];
}
- return results;
+ return (HitDetails[])RPC.call(DETAILS, params, addrs);
}
public String getSummary(HitDetails hit, Query query) throws IOException {
- Param param = new Param(OP_SUMMARY, hit, query);
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- Result result = (Result)call(param, address);
- return result.value.toString();
+ return getRemote(hit).getSummary(hit, query);
}
public String[] getSummary(HitDetails[] hits, Query query)
throws IOException {
- Writable[] params = new Writable[hits.length];
InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
+ Object[][] params = new Object[hits.length][2];
for (int i = 0; i < hits.length; i++) {
HitDetails hit = hits[i];
- params[i] = new Param(OP_SUMMARY, hit, query);
addrs[i] =
(InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
+ params[i][0] = hit;
+ params[i][1] = query;
}
- Writable[] results = call(params, addrs);
- String[] strings = new String[results.length];
- for (int i = 0; i < results.length; i++) {
- if (results[i] != null)
- strings[i] = ((Result)results[i]).value.toString();
- }
- return strings;
+ return (String[])RPC.call(SUMMARY, params, addrs);
}
public byte[] getContent(HitDetails hit) throws IOException {
- Param param = new Param(OP_CONTENT, hit);
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- Result result = (Result)call(param, address);
- return ((BytesWritable)result.value).get();
+ return getRemote(hit).getContent(hit);
}
public ParseData getParseData(HitDetails hit) throws IOException {
- Param param = new Param(OP_PARSEDATA, hit);
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- Result result = (Result)call(param, address);
- return (ParseData)result.value;
- }
+ return getRemote(hit).getParseData(hit);
+ }
public ParseText getParseText(HitDetails hit) throws IOException {
- Param param = new Param(OP_PARSETEXT, hit);
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- Result result = (Result)call(param, address);
- return (ParseText)result.value;
+ return getRemote(hit).getParseText(hit);
}
public String[] getAnchors(HitDetails hit) throws IOException {
- Param param = new Param(OP_ANCHORS, hit);
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- Result result = (Result)call(param, address);
- return ((ArrayWritable)result.value).toStrings();
+ return getRemote(hit).getAnchors(hit);
}
public long getFetchDate(HitDetails hit) throws IOException {
- Param param = new Param(OP_FETCHDATE, hit);
- InetSocketAddress address =
- (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
- Result result = (Result)call(param, address);
- return ((LongWritable)result.value).get();
+ return getRemote(hit).getFetchDate(hit);
}
public static void main(String[] args) throws Exception {
@@ -532,30 +306,28 @@
}
public void run() {
- while (shouldrun=true){
- try{
- LOG.info("Querying segments from search servers");
- updateSegments();
- } catch (IOException ioe) {
- LOG.warning("No search servers available!");
- addresses=new InetSocketAddress[0];
- }
+ while (running){
try{
Thread.sleep(10000);
} catch (InterruptedException ie){
LOG.info("Thread sleep interrupted.");
}
+ try{
+ LOG.info("Querying segments from search servers...");
+ updateSegments();
+ } catch (IOException ioe) {
+ LOG.warning("No search servers available!");
+ liveAddresses=new InetSocketAddress[0];
+ }
}
}
/**
* Stops the watchdog thread.
*/
- public void stop() {
- super.stop();
- LOG.info("stopping watchdog.");
- shouldrun=false;
- watchdog.interrupt();
+ public void close() {
+ running = false;
+ interrupt();
}
}
}