You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ku...@apache.org on 2007/12/04 20:13:29 UTC
svn commit: r601043 - in /lucene/nutch/trunk: CHANGES.txt
src/java/org/apache/nutch/searcher/DistributedSearch.java
src/test/org/apache/nutch/searcher/DistributedSearchTest.java
src/test/org/apache/nutch/searcher/TestDistributedSearch.java
Author: kubes
Date: Tue Dec 4 11:13:28 2007
New Revision: 601043
URL: http://svn.apache.org/viewvc?rev=601043&view=rev
Log:
NUTCH-581 - DistributedSearch does not update search servers added to search-servers.txt on the fly. This allows search servers to be added and removed on the fly. Thanks Rohan.
Added:
lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java
- copied, changed from r600001, lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java
Removed:
lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java
Modified:
lucene/nutch/trunk/CHANGES.txt
lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
Modified: lucene/nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/CHANGES.txt?rev=601043&r1=601042&r2=601043&view=diff
==============================================================================
--- lucene/nutch/trunk/CHANGES.txt (original)
+++ lucene/nutch/trunk/CHANGES.txt Tue Dec 4 11:13:28 2007
@@ -173,6 +173,9 @@
index-basic plugin. For backwards compatibility, add index-anchor plugin to
nutch-site.xml plugin.includes. (kubes)
+60. NUTCH-581 - DistributedSearch does not update search servers added to
+ search-servers.txt on the fly. (Rohan Mehta via kubes)
+
Release 0.9 - 2007-04-02
1. Changed log4j confiquration to log to stdout on commandline
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java?rev=601043&r1=601042&r2=601043&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearch.java Tue Dec 4 11:13:28 2007
@@ -17,24 +17,27 @@
package org.apache.nutch.searcher;
-import java.net.InetSocketAddress;
-import java.io.*;
-import java.util.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
-import org.apache.nutch.parse.ParseData;
-import org.apache.nutch.parse.ParseText;
-import org.apache.nutch.crawl.Inlinks;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.VersionedProtocol;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-
+import org.apache.nutch.crawl.Inlinks;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
import org.apache.nutch.util.NutchConfiguration;
/** Implements the search API over IPC connnections. */
@@ -94,12 +97,19 @@
private boolean running = true;
private Configuration conf;
+ private Path file;
+ private long timestamp;
+ private FileSystem fs;
+
/** Construct a client talking to servers listed in the named file.
* Each line in the file lists a server hostname and port, separated by
* whitespace.
*/
- public Client(Path file, Configuration conf) throws IOException {
+ public Client(Path file, Configuration conf)
+ throws IOException {
this(readConfig(file, conf), conf);
+ this.file = file;
+ this.timestamp = fs.getFileStatus(file).getModificationTime();
}
private static InetSocketAddress[] readConfig(Path path, Configuration conf)
@@ -135,6 +145,7 @@
this.conf = conf;
this.defaultAddresses = addresses;
this.liveServer = new boolean[addresses.length];
+ this.fs = FileSystem.get(conf);
updateSegments();
setDaemon(true);
start();
@@ -160,6 +171,24 @@
}
}
+ /**
+ * Check to see if search-servers file has been modified
+ *
+ * @throws IOException
+ */
+ public boolean isFileModified()
+ throws IOException {
+
+ if (file != null) {
+ long modTime = fs.getFileStatus(file).getModificationTime();
+ if (timestamp < modTime) {
+ this.timestamp = fs.getFileStatus(file).getModificationTime();
+ return true;
+ }
+ }
+
+ return false;
+ }
/** Updates segment names.
*
@@ -167,8 +196,12 @@
*/
public void updateSegments() throws IOException {
- int liveServers=0;
- int liveSegments=0;
+ int liveServers = 0;
+ int liveSegments = 0;
+
+ if (isFileModified()) {
+ defaultAddresses = readConfig(file, conf);
+ }
// Create new array of flags so they can all be updated at once.
boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
@@ -188,15 +221,17 @@
}
continue;
}
+
for (int j = 0; j < segments.length; j++) {
if (LOG.isTraceEnabled()) {
LOG.trace("Client: segment "+segments[j]+" at "+addr);
}
segmentToAddress.put(segments[j], addr);
}
+
updatedLiveServer[i] = true;
liveServers++;
- liveSegments+=segments.length;
+ liveSegments += segments.length;
}
// Now update live server flags.
@@ -413,5 +448,9 @@
running = false;
interrupt();
}
+
+ public boolean[] getLiveServer() {
+ return liveServer;
+ }
}
-}
+}
\ No newline at end of file
Copied: lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java (from r600001, lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java)
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java?p2=lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java&p1=lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java&r1=600001&r2=601043&rev=601043&view=diff
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/searcher/DistributedSearchTest.java (original)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/searcher/TestDistributedSearch.java Tue Dec 4 11:13:28 2007
@@ -16,53 +16,129 @@
*/
package org.apache.nutch.searcher;
+import java.io.BufferedWriter;
import java.io.IOException;
+import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.Server;
import org.apache.nutch.searcher.DistributedSearch.Client;
import org.apache.nutch.util.NutchConfiguration;
-import junit.framework.TestCase;
-
-public class DistributedSearchTest extends TestCase {
+public class TestDistributedSearch
+ extends TestCase {
private static final int DEFAULT_PORT = 60000;
private static final String DISTRIBUTED_SEARCH_TEST_PORT = "distributed.search.test.port";
- Configuration conf;
- Path searchdir=new Path("build/test/data/testcrawl/");
- Server server;
-
- protected void setUp() throws Exception {
- super.setUp();
- conf=NutchConfiguration.create();
+
+ private static final int DEFAULT_PORT1 = 60001;
+ private static final String DISTRIBUTED_SEARCH_TEST_PORT1 = "distributed.search.test.port1";
+
+ private static final int DEFAULT_PORT2 = 60002;
+ private static final String DISTRIBUTED_SEARCH_TEST_PORT2 = "distributed.search.test.port2";
+
+ Path searchdir = new Path("build/test/data/testcrawl/");
+
+ public void testDistibutedSearch()
+ throws IOException {
+
+ Configuration conf = NutchConfiguration.create();
+
//set up server & start it
- server=DistributedSearch.Server.getServer(conf, searchdir, conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT));
+ Server server = DistributedSearch.Server.getServer(conf, searchdir,
+ conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT));
server.start();
- }
-
- protected void tearDown() throws Exception {
- super.tearDown();
- if(server!=null){
- //stop server
- //server.stop();
+
+ int port = conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT);
+
+ InetSocketAddress[] addresses = new InetSocketAddress[1];
+ addresses[0] = new InetSocketAddress("localhost", port);
+
+ Client c = new DistributedSearch.Client(addresses, conf);
+
+ Query query = Query.parse("apache", conf);
+ Hits hits = c.search(query, 5, null, null, false);
+ c.getDetails(hits.getHit(0));
+ assertTrue(hits.getTotal() > 0);
+
+ if(server != null){
+ server.stop();
}
}
-
- public void testDistibutedSearch() throws IOException{
-
- int port=conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT, DEFAULT_PORT);
+
+ public void testUpdateSegments()
+ throws IOException {
+
+ // Startup 2 search servers. One was already started in setup, start another
+ // one at a different port
+
+ Configuration conf = NutchConfiguration.create();
+
+ Server server1 = DistributedSearch.Server.getServer(conf, searchdir,
+ conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT1, DEFAULT_PORT1));
+
+ Server server2 = DistributedSearch.Server.getServer(conf, searchdir,
+ conf.getInt(DISTRIBUTED_SEARCH_TEST_PORT2, DEFAULT_PORT2));
+
+ server1.start();
+ server2.start();
+
+ /* create a new file search-servers.txt
+ * with 1 server at port 60000
+ */
+ FileSystem fs = FileSystem.get(conf);
+ Path testServersPath = new Path(searchdir, "search-server.txt");
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(
+ testServersPath, true)));
+ bw.write("localhost " + DEFAULT_PORT1 + "\n");
+ bw.flush();
+ bw.close();
+
+ /*
+ * Check if it found the server
+ */
+ Client c = new DistributedSearch.Client(testServersPath, conf);
+ boolean[] liveServers = c.getLiveServer();
+ assertEquals(liveServers.length, 1);
+
+ /* Add both the servers at ports 60000 & 60005
+ * to the search-server.txt file
+ */
+
+ // give the servers a little time to wait for file modification
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
- InetSocketAddress[] addresses=new InetSocketAddress[1];
- addresses[0]=new InetSocketAddress("localhost", port);
+ bw = new BufferedWriter(new OutputStreamWriter(fs.create(testServersPath,
+ true)));
+ bw.write("localhost " + DEFAULT_PORT1 + "\n");
+ bw.write("localhost " + DEFAULT_PORT2 + "\n");
+ bw.flush();
+ bw.close();
+
+ // Check if it found both the servers
+ c.updateSegments();
- Client c=new DistributedSearch.Client(addresses, conf);
- Query query=Query.parse("apache", conf);
- Hits hits=c.search(query, 5, null, null, false);
- c.getDetails(hits.getHit(0));
- assertTrue(hits.getTotal()>0);
+ liveServers = c.getLiveServer();
+ assertEquals(liveServers.length, 2);
+
+ if (server1 != null) {
+ server1.stop();
+ }
+ if (server2 != null) {
+ server2.stop();
+ }
+
+ fs.delete(testServersPath);
}
}