You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by si...@apache.org on 2006/06/27 21:34:20 UTC
svn commit: r417567 -
/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Author: siren
Date: Tue Jun 27 12:34:20 2006
New Revision: 417567
URL: http://svn.apache.org/viewvc?rev=417567&view=rev
Log:
NUTCH-306 fix for concurrency problem contributed by Grant Glouser
Modified:
lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=417567&r1=417566&r2=417567&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue Jun 27 12:34:20 2006
@@ -82,7 +82,7 @@
Runnable {
private InetSocketAddress[] defaultAddresses;
- private InetSocketAddress[] liveAddresses;
+ private boolean[] liveServer;
private HashMap segmentToAddress = new HashMap();
private boolean running = true;
@@ -128,6 +128,7 @@
public Client(InetSocketAddress[] addresses, Configuration conf) throws IOException {
this.conf = conf;
this.defaultAddresses = addresses;
+ this.liveServer = new boolean[addresses.length];
updateSegments();
setDaemon(true);
start();
@@ -162,7 +163,9 @@
int liveServers=0;
int liveSegments=0;
- Vector liveAddresses=new Vector();
+
+ // Create new array of flags so they can all be updated at once.
+ boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
// build segmentToAddress map
Object[][] params = new Object[defaultAddresses.length][0];
@@ -173,6 +176,7 @@
InetSocketAddress addr = defaultAddresses[i];
String[] segments = results[i];
if (segments == null) {
+ updatedLiveServer[i] = false;
if (LOG.isWarnEnabled()) {
LOG.warn("Client: no segments from: " + addr);
}
@@ -184,13 +188,13 @@
}
segmentToAddress.put(segments[j], addr);
}
- liveAddresses.add(addr);
+ updatedLiveServer[i] = true;
liveServers++;
liveSegments+=segments.length;
}
- this.liveAddresses = (InetSocketAddress[]) // update liveAddresses
- liveAddresses.toArray(new InetSocketAddress[liveAddresses.size()]);
+ // Now update live server flags.
+ this.liveServer = updatedLiveServer;
if (LOG.isInfoEnabled()) {
LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments.");
@@ -206,7 +210,26 @@
public Hits search(final Query query, final int numHits,
final String dedupField, final String sortField,
final boolean reverse) throws IOException {
- long totalHits = 0;
+ // Get the list of live servers. It would be nice to build this
+ // list in updateSegments(), but that would create concurrency issues.
+ // We grab a local reference to the live server flags in case it
+ // is updated while we are building our list of liveAddresses.
+ boolean[] savedLiveServer = this.liveServer;
+ int numLive = 0;
+ for (int i = 0; i < savedLiveServer.length; i++) {
+ if (savedLiveServer[i])
+ numLive++;
+ }
+ InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive];
+ int[] liveIndexNos = new int[numLive];
+ int k = 0;
+ for (int i = 0; i < savedLiveServer.length; i++) {
+ if (savedLiveServer[i]) {
+ liveAddresses[k] = defaultAddresses[i];
+ liveIndexNos[k] = i;
+ k++;
+ }
+ }
Object[][] params = new Object[liveAddresses.length][5];
for (int i = 0; i < params.length; i++) {
@@ -230,6 +253,7 @@
queue = new TreeSet();
}
+ long totalHits = 0;
Comparable maxValue = null;
for (int i = 0; i < results.length; i++) {
Hits hits = results[i];
@@ -241,7 +265,7 @@
((reverse || sortField == null)
? h.getSortValue().compareTo(maxValue) >= 0
: h.getSortValue().compareTo(maxValue) <= 0)) {
- queue.add(new Hit(i, h.getIndexDocNo(),
+ queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(),
h.getSortValue(), h.getDedupValue()));
if (queue.size() > numHits) { // if hit queue overfull
queue.remove(queue.last()); // remove lowest in hit queue
@@ -255,7 +279,7 @@
private Protocol getRemote(Hit hit) {
return (Protocol)
- RPC.getProxy(Protocol.class, liveAddresses[hit.getIndexNo()], conf);
+ RPC.getProxy(Protocol.class, defaultAddresses[hit.getIndexNo()], conf);
}
private Protocol getRemote(HitDetails hit) {
@@ -276,7 +300,7 @@
InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
Object[][] params = new Object[hits.length][1];
for (int i = 0; i < hits.length; i++) {
- addrs[i] = liveAddresses[hits[i].getIndexNo()];
+ addrs[i] = defaultAddresses[hits[i].getIndexNo()];
params[i][0] = hits[i];
}
return (HitDetails[])RPC.call(DETAILS, params, addrs, conf);
@@ -368,7 +392,7 @@
updateSegments();
} catch (IOException ioe) {
if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!"); }
- liveAddresses=new InetSocketAddress[0];
+ liveServer = new boolean[defaultAddresses.length];
}
}
}