You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ku...@apache.org on 2007/12/04 20:13:29 UTC

svn commit: r601043 - in /lucene/nutch/trunk: CHANGES.txt src/java/org/apache/nutch/searcher/DistributedSearch.java src/test/org/apache/nutch/searcher/DistributedSearchTest.java src/test/org/apache/nutch/searcher/TestDistributedSearch.java

Author: kubes
Date: Tue Dec  4 11:13:28 2007
New Revision: 601043

URL: http://svn.apache.org/viewvc?rev=601043&view=rev
Log:
NUTCH-581 - DistributedSearch does not update search servers added to search-servers.txt on the fly.  This allows search servers to be added and removed on the fly.  Thanks Rohan.

Added:
    lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java
      - copied, changed from r600001, lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java
Removed:
    lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java
Modified:
    lucene/nutch/trunk/CHANGES.txt
    lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java

Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=601043&r1=601042&r2=601043&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Dec  4 11:13:28 2007
@@ -173,6 +173,9 @@
     index-basic plugin. For backwards compatibility, add index-anchor plugin to 
     nutch-site.xml plugin.includes. (kubes)
 
+60. NUTCH-581 - DistributedSearch does not update search servers added to 
+    search-servers.txt on the fly.  (Rohan Mehta via kubes)
+
 Release 0.9 - 2007-04-02
 
  1. Changed log4j confiquration to log to stdout on commandline

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=601043&r1=601042&r2=601043&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue Dec  4 11:13:28 2007
@@ -17,24 +17,27 @@
 
 package org.apache.nutch.searcher;
 
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.nutch.parse.ParseData;
-import org.apache.nutch.parse.ParseText;
-import org.apache.nutch.crawl.Inlinks;
-
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
 import org.apache.nutch.util.NutchConfiguration;
 
 /** Implements the search API over IPC connnections. */
@@ -94,12 +97,19 @@
     private boolean running = true;
     private Configuration conf;
 
+    private Path file;
+    private long timestamp;
+    private FileSystem fs;
+    
     /** Construct a client talking to servers listed in the named file.
      * Each line in the file lists a server hostname and port, separated by
      * whitespace. 
      */
-    public Client(Path file, Configuration conf) throws IOException {
+    public Client(Path file, Configuration conf) 
+      throws IOException {
       this(readConfig(file, conf), conf);
+      this.file = file;
+      this.timestamp = fs.getFileStatus(file).getModificationTime();
     }
 
     private static InetSocketAddress[] readConfig(Path path, Configuration conf)
@@ -135,6 +145,7 @@
       this.conf = conf;
       this.defaultAddresses = addresses;
       this.liveServer = new boolean[addresses.length];
+      this.fs = FileSystem.get(conf);
       updateSegments();
       setDaemon(true);
       start();
@@ -160,6 +171,24 @@
       }
     }
 
+    /**
+     * Check to see if search-servers file has been modified
+     * 
+     * @throws IOException
+     */
+    public boolean isFileModified()
+      throws IOException {
+
+      if (file != null) {        
+        long modTime = fs.getFileStatus(file).getModificationTime();
+        if (timestamp < modTime) {
+          this.timestamp = fs.getFileStatus(file).getModificationTime();
+          return true;
+        }
+      }
+
+      return false;
+    }
 
     /** Updates segment names.
      * 
@@ -167,8 +196,12 @@
      */
     public void updateSegments() throws IOException {
       
-      int liveServers=0;
-      int liveSegments=0;
+      int liveServers = 0;
+      int liveSegments = 0;
+      
+      if (isFileModified()) {
+        defaultAddresses = readConfig(file, conf);
+      }
       
       // Create new array of flags so they can all be updated at once.
       boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
@@ -188,15 +221,17 @@
           }
           continue;
         }
+        
         for (int j = 0; j < segments.length; j++) {
           if (LOG.isTraceEnabled()) {
             LOG.trace("Client: segment "+segments[j]+" at "+addr);
           }
           segmentToAddress.put(segments[j], addr);
         }
+        
         updatedLiveServer[i] = true;
         liveServers++;
-        liveSegments+=segments.length;
+        liveSegments += segments.length;
       }
 
       // Now update live server flags.
@@ -413,5 +448,9 @@
       running = false;
       interrupt();
     }
+
+    public boolean[] getLiveServer() {
+      return liveServer;
+    }
   }
-}
+}
\ No newline at end of file

Copied: lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java (from r600001, lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java)
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java?p2=lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java&p1=lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java&r1=600001&r2=601043&rev=601043&view=diff
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java Tue Dec  4 11:13:28 2007
@@ -16,53 +16,129 @@
 */
 package org.apache.nutch.searcher;
 
+import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.net.InetSocketAddress;
 
+import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.Server;
 import org.apache.nutch.searcher.DistributedSearch.Client;
 import org.apache.nutch.util.NutchConfiguration;
 
-import junit.framework.TestCase;
-
-public class DistributedSearchTest extends TestCase {
+public class TestDistributedSearch
+  extends TestCase {
 
   private static final int DEFAULT_PORT = 60000;
   private static final String DISTRIBUTED_SEARCH_TEST_PORT = "distributed.search.test.port";
-  Configuration conf;
-  Path searchdir=new Path("build/test/data/testcrawl/");
-  Server server;
-  
-  protected void setUp() throws Exception {
-    super.setUp();
-    conf=NutchConfiguration.create();
+
+  private static final int DEFAULT_PORT1 = 60001;
+  private static final String DISTRIBUTED_SEARCH_TEST_PORT1 = "distributed.search.test.port1";
+  
+  private static final int DEFAULT_PORT2 = 60002;
+  private static final String DISTRIBUTED_SEARCH_TEST_PORT2 = "distributed.search.test.port2";
+  
+  Path searchdir = new Path("build/test/data/testcrawl/");
+
+  public void testDistibutedSearch() 
+    throws IOException {
+  
+    Configuration conf = NutchConfiguration.create();
+    
     //set up server & start it
-    server=DistributedSearch.Server.getServer(conf, searchdir, conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT));
+    Server server = DistributedSearch.Server.getServer(conf, searchdir, 
+      conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT));
     server.start();
-  }
-
-  protected void tearDown() throws Exception {
-    super.tearDown();
-    if(server!=null){
-      //stop server
-      //server.stop();
+    
+    int port = conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT);
+    
+    InetSocketAddress[] addresses = new InetSocketAddress[1];
+    addresses[0] = new InetSocketAddress("localhost", port);
+    
+    Client c = new DistributedSearch.Client(addresses, conf);
+  
+    Query query = Query.parse("apache", conf);
+    Hits hits = c.search(query, 5, null, null, false);
+    c.getDetails(hits.getHit(0));
+    assertTrue(hits.getTotal() > 0);
+    
+    if(server != null){
+      server.stop();
     }
   }
-
-  public void testDistibutedSearch() throws IOException{
-
-    int port=conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT);
+  
+  public void testUpdateSegments() 
+    throws IOException {
+    
+    // Startup 2 search servers. One was already started in setup, start another 
+    // one at a different port
+    
+    Configuration conf = NutchConfiguration.create();
+    
+    Server server1 = DistributedSearch.Server.getServer(conf, searchdir, 
+      conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT1, DEFAULT_PORT1));
+    
+    Server server2 = DistributedSearch.Server.getServer(conf, searchdir, 
+      conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT2, DEFAULT_PORT2));
+    
+    server1.start();
+    server2.start();
+    
+    /* create a new file search-servers.txt
+     * with 1 server at port 60000
+     */
+    FileSystem fs = FileSystem.get(conf);
+    Path testServersPath = new Path(searchdir, "search-server.txt");
+    BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(
+      testServersPath, true)));    
+    bw.write("localhost " + DEFAULT_PORT1 + "\n");    
+    bw.flush();
+    bw.close();
+  
+    /* 
+     * Check if it found the server
+     */
+    Client c = new DistributedSearch.Client(testServersPath, conf);
+    boolean[] liveServers = c.getLiveServer();
+    assertEquals(liveServers.length, 1);
+  
+    /* Add both the servers at ports 60000 & 60005 
+     * to the search-server.txt file
+     */
+    
+    // give the servers a little time to wait for file modification
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
     
-    InetSocketAddress[] addresses=new InetSocketAddress[1];
-    addresses[0]=new InetSocketAddress("localhost", port);
+    bw = new BufferedWriter(new OutputStreamWriter(fs.create(testServersPath,
+      true)));    
+    bw.write("localhost " + DEFAULT_PORT1 + "\n");    
+    bw.write("localhost " + DEFAULT_PORT2 + "\n");
+    bw.flush();
+    bw.close();
+    
+    // Check if it found both the servers
+    c.updateSegments();
     
-    Client c=new DistributedSearch.Client(addresses, conf);
 
-    Query query=Query.parse("apache", conf);
-    Hits hits=c.search(query, 5, null, null, false);
-    c.getDetails(hits.getHit(0));
-    assertTrue(hits.getTotal()>0);
+    liveServers = c.getLiveServer();
+    assertEquals(liveServers.length, 2);
+  
+    if (server1 != null) {
+      server1.stop();
+    }
+    if (server2 != null) {
+      server2.stop();
+    }
+    
+    fs.delete(testServersPath);    
   }
 }