You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by si...@apache.org on 2006/06/27 21:34:20 UTC

svn commit: r417567 - /lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java

Author: siren
Date: Tue Jun 27 12:34:20 2006
New Revision: 417567

URL: http://svn.apache.org/viewvc?rev=417567&view=rev
Log:
NUTCH-306 fix for concurrency problem contributed by Grant Glouser 

Modified:
    lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=417567&r1=417566&r2=417567&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue Jun 27 12:34:20 2006
@@ -82,7 +82,7 @@
                Runnable {
 
     private InetSocketAddress[] defaultAddresses;
-    private InetSocketAddress[] liveAddresses;
+    private boolean[] liveServer;
     private HashMap segmentToAddress = new HashMap();
     
     private boolean running = true;
@@ -128,6 +128,7 @@
     public Client(InetSocketAddress[] addresses, Configuration conf) throws IOException {
       this.conf = conf;
       this.defaultAddresses = addresses;
+      this.liveServer = new boolean[addresses.length];
       updateSegments();
       setDaemon(true);
       start();
@@ -162,7 +163,9 @@
       
       int liveServers=0;
       int liveSegments=0;
-      Vector liveAddresses=new Vector();
+      
+      // Create new array of flags so they can all be updated at once.
+      boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
       
       // build segmentToAddress map
       Object[][] params = new Object[defaultAddresses.length][0];
@@ -173,6 +176,7 @@
         InetSocketAddress addr = defaultAddresses[i];
         String[] segments = results[i];
         if (segments == null) {
+          updatedLiveServer[i] = false;
           if (LOG.isWarnEnabled()) {
             LOG.warn("Client: no segments from: " + addr);
           }
@@ -184,13 +188,13 @@
           }
           segmentToAddress.put(segments[j], addr);
         }
-        liveAddresses.add(addr);
+        updatedLiveServer[i] = true;
         liveServers++;
         liveSegments+=segments.length;
       }
 
-      this.liveAddresses = (InetSocketAddress[]) // update liveAddresses
-      liveAddresses.toArray(new InetSocketAddress[liveAddresses.size()]);
+      // Now update live server flags.
+      this.liveServer = updatedLiveServer;
 
       if (LOG.isInfoEnabled()) {
         LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments.");
@@ -206,7 +210,26 @@
     public Hits search(final Query query, final int numHits,
                        final String dedupField, final String sortField,
                        final boolean reverse) throws IOException {
-      long totalHits = 0;
+      // Get the list of live servers.  It would be nice to build this
+      // list in updateSegments(), but that would create concurrency issues.
+      // We grab a local reference to the live server flags in case it
+      // is updated while we are building our list of liveAddresses.
+      boolean[] savedLiveServer = this.liveServer;
+      int numLive = 0;
+      for (int i = 0; i < savedLiveServer.length; i++) {
+        if (savedLiveServer[i])
+          numLive++;
+      }
+      InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive];
+      int[] liveIndexNos = new int[numLive];
+      int k = 0;
+      for (int i = 0; i < savedLiveServer.length; i++) {
+        if (savedLiveServer[i]) {
+          liveAddresses[k] = defaultAddresses[i];
+          liveIndexNos[k] = i;
+          k++;
+        }
+      }
 
       Object[][] params = new Object[liveAddresses.length][5];
       for (int i = 0; i < params.length; i++) {
@@ -230,6 +253,7 @@
         queue = new TreeSet();
       }
       
+      long totalHits = 0;
       Comparable maxValue = null;
       for (int i = 0; i < results.length; i++) {
         Hits hits = results[i];
@@ -241,7 +265,7 @@
               ((reverse || sortField == null)
                ? h.getSortValue().compareTo(maxValue) >= 0
                : h.getSortValue().compareTo(maxValue) <= 0)) {
-            queue.add(new Hit(i, h.getIndexDocNo(),
+            queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(),
                               h.getSortValue(), h.getDedupValue()));
             if (queue.size() > numHits) {         // if hit queue overfull
               queue.remove(queue.last());         // remove lowest in hit queue
@@ -255,7 +279,7 @@
     
     private Protocol getRemote(Hit hit) {
       return (Protocol)
-        RPC.getProxy(Protocol.class, liveAddresses[hit.getIndexNo()], conf);
+        RPC.getProxy(Protocol.class, defaultAddresses[hit.getIndexNo()], conf);
     }
 
     private Protocol getRemote(HitDetails hit) {
@@ -276,7 +300,7 @@
       InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
       Object[][] params = new Object[hits.length][1];
       for (int i = 0; i < hits.length; i++) {
-        addrs[i] = liveAddresses[hits[i].getIndexNo()];
+        addrs[i] = defaultAddresses[hits[i].getIndexNo()];
         params[i][0] = hits[i];
       }
       return (HitDetails[])RPC.call(DETAILS, params, addrs, conf);
@@ -368,7 +392,7 @@
           updateSegments();
         } catch (IOException ioe) {
           if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!"); }
-          liveAddresses=new InetSocketAddress[0];
+          liveServer = new boolean[defaultAddresses.length];
         }
       }
     }