You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/01/12 14:26:35 UTC

svn commit: r733738 [2/3] - in /lucene/nutch/trunk: ./ bin/ lib/ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer/lucene/ src/java/org/apache/nutch/indexer/solr/ src/java/org/apache/nutch/net/ src/ja...

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,326 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistributedSearchBean implements SearchBean {
+
+  private static final ExecutorService executor =
+    Executors.newCachedThreadPool();
+
+  private final ScheduledExecutorService pingService;
+
+  private class SearchTask implements Callable<Hits> {
+    private int id;
+
+    private Query query;
+    private int numHits;
+    private String dedupField;
+    private String sortField;
+    private boolean reverse;
+
+    public SearchTask(int id) {
+      this.id = id;
+    }
+
+    public Hits call() throws Exception {
+      if (!liveServers[id]) {
+        return null;
+      }
+      return beans[id].search(query, numHits, dedupField, sortField, reverse);
+    }
+
+    public void setSearchArgs(Query query, int numHits, String dedupField,
+                              String sortField, boolean reverse) {
+      this.query = query;
+      this.numHits = numHits;
+      this.dedupField = dedupField;
+      this.sortField = sortField;
+      this.reverse = reverse;
+    }
+
+  }
+
+  private class DetailTask implements Callable<HitDetails[]> {
+    private int id;
+
+    private Hit[] hits;
+
+    public DetailTask(int id) {
+     this.id = id;
+    }
+
+    public HitDetails[] call() throws Exception {
+      if (hits == null) {
+        return null;
+      }
+      return beans[id].getDetails(hits);
+    }
+
+    public void setHits(Hit[] hits) {
+      this.hits = hits;
+    }
+
+  }
+
+  private class PingWorker implements Runnable {
+    private int id;
+
+    public PingWorker(int id) {
+      this.id = id;
+    }
+
+    public void run()  {
+      try {
+        if (beans[id].ping()) {
+          liveServers[id] = true;
+        } else {
+          liveServers[id] = false;
+        }
+      } catch (IOException e) {
+        liveServers[id] = false;
+      }
+    }
+  }
+
+  private volatile boolean liveServers[];
+
+  private SearchBean[] beans;
+
+  private List<Callable<Hits>> searchTasks;
+
+  private List<Callable<HitDetails[]>> detailTasks;
+
+  private List<PingWorker> pingWorkers;
+
+  private long timeout;
+
+  public DistributedSearchBean(Configuration conf,
+                               Path luceneConfig, Path solrConfig)
+  throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+
+    this.timeout = conf.getLong("ipc.client.timeout", 60000);
+
+    List<SearchBean> beanList = new ArrayList<SearchBean>();
+
+    if (fs.exists(luceneConfig)) {
+      addLuceneBeans(beanList, luceneConfig, conf);
+    }
+
+    if (fs.exists(solrConfig)) {
+      addSolrBeans(beanList, solrConfig, conf);
+    }
+
+    beans = beanList.toArray(new SearchBean[beanList.size()]);
+
+    liveServers = new boolean[beans.length];
+    for (int i = 0; i < liveServers.length; i++) {
+      liveServers[i] = true;
+    }
+
+    searchTasks = new ArrayList<Callable<Hits>>();
+    detailTasks = new ArrayList<Callable<HitDetails[]>>();
+    pingWorkers = new ArrayList<PingWorker>();
+
+    for (int i = 0; i < beans.length; i++) {
+      searchTasks.add(new SearchTask(i));
+      detailTasks.add(new DetailTask(i));
+      pingWorkers.add(new PingWorker(i));
+    }
+
+    pingService = Executors.newScheduledThreadPool(beans.length);
+    for (PingWorker worker : pingWorkers) {
+      pingService.scheduleAtFixedRate(worker, 0, 10, TimeUnit.SECONDS);
+    }
+
+  }
+
+  private static void addLuceneBeans(List<SearchBean> beanList,
+                                     Path luceneConfig, Configuration conf)
+  throws IOException {
+    Configuration newConf = new Configuration(conf);
+
+    // do not retry connections
+    newConf.setInt("ipc.client.connect.max.retries", 0);
+
+    List<InetSocketAddress> luceneServers =
+      NutchBean.readAddresses(luceneConfig, conf);
+    for (InetSocketAddress addr : luceneServers) {
+      beanList.add((RPCSearchBean) RPC.getProxy(RPCSearchBean.class,
+          LuceneSearchBean.VERSION, addr, newConf));
+    }
+  }
+
+  private static void addSolrBeans(List<SearchBean> beanList,
+                                   Path solrConfig, Configuration conf)
+  throws IOException {
+    for (String solrServer : NutchBean.readConfig(solrConfig, conf)) {
+      beanList.add(new SolrSearchBean(conf, solrServer));
+    }
+  }
+
+  public String getExplanation(Query query, Hit hit) throws IOException {
+    return beans[hit.getIndexNo()].getExplanation(query, hit);
+  }
+
+  public Hits search(Query query, int numHits, String dedupField,
+                     String sortField, boolean reverse) throws IOException {
+
+    for (Callable<Hits> task : searchTasks) {
+      ((SearchTask)task).setSearchArgs(query, numHits, dedupField, sortField,
+          reverse);
+    }
+
+    List<Future<Hits>> allHits;
+    try {
+      allHits =
+        executor.invokeAll(searchTasks, timeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    PriorityQueue<Hit> queue;            // cull top hits from results
+    if (sortField == null || reverse) {
+      queue = new PriorityQueue<Hit>(numHits);
+    } else {
+      queue = new PriorityQueue<Hit>(numHits, new Comparator<Hit>() {
+        public int compare(Hit h1, Hit h2) {
+          return h2.compareTo(h1); // reverse natural order
+        }
+      });
+    }
+
+    long totalHits = 0;
+    int allHitsSize = allHits.size();
+    for (int i = 0; i < allHitsSize; i++) {
+      Hits hits = null;
+      try {
+        hits = allHits.get(i).get();
+      } catch (InterruptedException e) {
+        // ignore
+      } catch (ExecutionException e) {
+        LOG.warn("Retrieving hits failed with exception: " +
+                 StringUtils.stringifyException(e.getCause()));
+      }
+
+      if (hits == null) {
+        continue;
+      }
+
+      totalHits += hits.getTotal();
+
+      int hitsLength = hits.getLength();
+      for (int j = 0; j < hitsLength; j++) {
+        Hit hit = hits.getHit(j);
+        Hit newHit = new Hit(i, hit.getUniqueKey(),
+                             hit.getSortValue(), hit.getDedupValue());
+        queue.add(newHit);
+        if (queue.size() > numHits) {         // if hit queue overfull
+          queue.remove();
+        }
+      }
+    }
+
+    // we have to sort results since PriorityQueue.toArray
+    // may not return results in sorted order
+    Hit[] culledResults = queue.toArray(new Hit[queue.size()]);
+    Arrays.sort(culledResults, Collections.reverseOrder(queue.comparator()));
+
+    return new Hits(totalHits, culledResults);
+  }
+
+  public void close() throws IOException {
+    executor.shutdown();
+    pingService.shutdown();
+    for (SearchBean bean : beans) {
+      bean.close();
+    }
+  }
+
+  public HitDetails getDetails(Hit hit) throws IOException {
+    return beans[hit.getIndexNo()].getDetails(hit);
+  }
+
+  @SuppressWarnings("unchecked")
+  public HitDetails[] getDetails(Hit[] hits) throws IOException {
+    List<Hit>[] hitList = new ArrayList[detailTasks.size()];
+
+    for (int i = 0; i < hitList.length; i++) {
+      hitList[i] = new ArrayList<Hit>();
+    }
+
+    for (int i = 0; i < hits.length; i++) {
+      Hit hit = hits[i];
+      hitList[hit.getIndexNo()].add(hit);
+    }
+
+    for (int i = 0; i < detailTasks.size(); i++) {
+      DetailTask task = (DetailTask)detailTasks.get(i);
+      if (hitList[i].size() > 0) {
+        task.setHits(hitList[i].toArray(new Hit[hitList[i].size()]));
+      } else {
+        task.setHits(null);
+      }
+    }
+
+    List<Future<HitDetails[]>> allDetails;
+    try {
+      allDetails =
+        executor.invokeAll(detailTasks, timeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    /* getDetails(Hit[]) method assumes that HitDetails[i] returned corresponds
+     * to Hit[i] given as parameter. To keep this order, we have to 'merge'
+     * HitDetails[] returned from individual detailTasks.
+     */
+    HitDetails[][] detailsMatrix = new HitDetails[detailTasks.size()][];
+    for (int i = 0; i < detailsMatrix.length; i++) {
+      try {
+        detailsMatrix[i] = allDetails.get(i).get();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        }
+        throw new RuntimeException(e);
+      }
+    }
+
+    int[] hitPos = new int[detailTasks.size()]; // keep track of where we are
+    HitDetails[] detailsArr = new HitDetails[hits.length];
+    for (int i = 0; i < detailsArr.length; i++) {
+      int indexNo = hits[i].getIndexNo();
+      detailsArr[i] = detailsMatrix[indexNo][(hitPos[indexNo]++)];
+    }
+
+    return detailsArr;
+  }
+
+  public boolean ping() {
+    return true; // not used
+  }
+
+}

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,214 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+
+public class DistributedSegmentBean implements SegmentBean {
+
+  private static final ExecutorService executor =
+    Executors.newCachedThreadPool();
+
+  private final ScheduledExecutorService pingService;
+
+  private class DistSummmaryTask implements Callable<Summary[]> {
+    private int id;
+
+    private HitDetails[] details;
+    private Query query;
+
+    public DistSummmaryTask(int id) {
+      this.id = id;
+    }
+
+    public Summary[] call() throws Exception {
+      if (details == null) {
+        return null;
+      }
+      return beans[id].getSummary(details, query);
+    }
+
+    public void setSummaryArgs(HitDetails[] details, Query query) {
+      this.details = details;
+      this.query = query;
+    }
+
+  }
+
+  private class SegmentWorker implements Runnable {
+    private int id;
+
+    public SegmentWorker(int id) {
+      this.id = id;
+    }
+
+    public void run()  {
+      try {
+        String[] segments = beans[id].getSegmentNames();
+        for (String segment : segments) {
+          segmentMap.put(segment, id);
+        }
+      } catch (IOException e) {
+        // remove all segments this bean was serving
+        Iterator<Map.Entry<String, Integer>> i =
+          segmentMap.entrySet().iterator();
+        while (i.hasNext()) {
+          Map.Entry<String, Integer> entry = i.next();
+          int curId = entry.getValue();
+          if (curId == this.id) {
+            i.remove();
+          }
+        }
+      }
+    }
+  }
+
+  private long timeout;
+
+  private SegmentBean[] beans;
+
+  private ConcurrentMap<String, Integer> segmentMap;
+
+  private List<Callable<Summary[]>> summaryTasks;
+
+  private List<SegmentWorker> segmentWorkers;
+
+  public DistributedSegmentBean(Configuration conf, Path serversConfig)
+  throws IOException {
+    this.timeout = conf.getLong("ipc.client.timeout", 60000);
+
+    List<SegmentBean> beanList = new ArrayList<SegmentBean>();
+
+    List<InetSocketAddress> segmentServers =
+        NutchBean.readAddresses(serversConfig, conf);
+
+    for (InetSocketAddress addr : segmentServers) {
+      SegmentBean bean = (RPCSegmentBean) RPC.getProxy(RPCSegmentBean.class,
+          FetchedSegments.VERSION, addr, conf);
+      beanList.add(bean);
+    }
+
+    beans = beanList.toArray(new SegmentBean[beanList.size()]);
+
+    summaryTasks = new ArrayList<Callable<Summary[]>>(beans.length);
+    segmentWorkers = new ArrayList<SegmentWorker>(beans.length);
+
+    for (int i = 0; i < beans.length; i++) {
+      summaryTasks.add(new DistSummmaryTask(i));
+      segmentWorkers.add(new SegmentWorker(i));
+    }
+
+    segmentMap = new ConcurrentHashMap<String, Integer>();
+
+    pingService = Executors.newScheduledThreadPool(beans.length);
+    for (SegmentWorker worker : segmentWorkers) {
+      pingService.scheduleAtFixedRate(worker, 0, 30, TimeUnit.SECONDS);
+    }
+  }
+
+  private SegmentBean getBean(HitDetails details) {
+    return beans[segmentMap.get(details.getValue("segment"))];
+  }
+
+  public String[] getSegmentNames() {
+    return segmentMap.keySet().toArray(new String[segmentMap.size()]);
+  }
+
+  public byte[] getContent(HitDetails details) throws IOException {
+    return getBean(details).getContent(details);
+  }
+
+  public long getFetchDate(HitDetails details) throws IOException {
+    return getBean(details).getFetchDate(details);
+  }
+
+  public ParseData getParseData(HitDetails details) throws IOException {
+    return getBean(details).getParseData(details);
+  }
+
+  public ParseText getParseText(HitDetails details) throws IOException {
+    return getBean(details).getParseText(details);
+  }
+
+  public void close() throws IOException {
+    executor.shutdown();
+    pingService.shutdown();
+    for (SegmentBean bean : beans) {
+      bean.close();
+    }
+  }
+
+  public Summary getSummary(HitDetails details, Query query)
+  throws IOException {
+    return getBean(details).getSummary(details, query);
+  }
+
+  @SuppressWarnings("unchecked")
+  public Summary[] getSummary(HitDetails[] detailsArr, Query query)
+  throws IOException {
+    List<HitDetails>[] detailsList = new ArrayList[summaryTasks.size()];
+    for (int i = 0; i < detailsList.length; i++) {
+      detailsList[i] = new ArrayList<HitDetails>();
+    }
+    for (HitDetails details : detailsArr) {
+      detailsList[segmentMap.get(details.getValue("segment"))].add(details);
+    }
+    for (int i = 0; i < summaryTasks.size(); i++) {
+      DistSummmaryTask task = (DistSummmaryTask)summaryTasks.get(i);
+      if (detailsList[i].size() > 0) {
+        HitDetails[] taskDetails =
+          detailsList[i].toArray(new HitDetails[detailsList[i].size()]);
+        task.setSummaryArgs(taskDetails, query);
+      } else {
+        task.setSummaryArgs(null, null);
+      }
+    }
+
+    List<Future<Summary[]>> summaries;
+    try {
+       summaries =
+         executor.invokeAll(summaryTasks, timeout, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    List<Summary> summaryList = new ArrayList<Summary>();
+    for (Future<Summary[]> f : summaries) {
+      Summary[] summaryArray;
+      try {
+         summaryArray = f.get();
+         if (summaryArray == null) {
+           continue;
+         }
+         for (Summary summary : summaryArray) {
+           summaryList.add(summary);
+         }
+      } catch (Exception e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        }
+        throw new RuntimeException(e);
+      }
+    }
+
+    return summaryList.toArray(new Summary[summaryList.size()]);
+  }
+
+}

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java Mon Jan 12 05:26:16 2009
@@ -19,8 +19,16 @@
 
 import java.io.IOException;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.*;
@@ -35,20 +43,89 @@
 
 /** Implements {@link HitSummarizer} and {@link HitContent} for a set of
  * fetched segments. */
-public class FetchedSegments implements HitSummarizer, HitContent {
+public class FetchedSegments implements RPCSegmentBean {
 
-  private static class Segment implements Closeable {
-    
-    private static final Partitioner PARTITIONER = new HashPartitioner();
+  public static final long VERSION = 1L;
 
-    private FileSystem fs;
-    private Path segmentDir;
+  private static final ExecutorService executor =
+    Executors.newCachedThreadPool();
+
+  private class SummaryTask implements Callable<Summary> {
+    private final HitDetails details;
+    private final Query query;
+
+    public SummaryTask(HitDetails details, Query query) {
+      this.details = details;
+      this.query = query;
+    }
+
+    public Summary call() throws Exception {
+      return getSummary(details, query);
+    }
+  }
+
+  private class SegmentUpdater extends Thread {
+
+    @Override
+    public void run() {
+      while (true) {
+        try {
+          final FileStatus[] fstats = fs.listStatus(segmentsDir,
+              HadoopFSUtil.getPassDirectoriesFilter(fs));
+          final Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
+          final Iterator<Map.Entry<String, Segment>> i =
+            segments.entrySet().iterator();
+          while (i.hasNext()) {
+            final Map.Entry<String, Segment> entry = i.next();
+            final Segment seg = entry.getValue();
+            if (!fs.exists(seg.segmentDir)) {
+              try {
+                seg.close();
+              } catch (final Exception e) {
+                /* A segment may fail to close
+                 * since it may already be deleted from
+                 * file system. So we just ignore the
+                 * exception and remove the mapping from
+                 * 'segments'.
+                 */
+              } finally {
+                i.remove();
+              }
+            }
+          }
+
+          if (segmentDirs != null) {
+            for (final Path segmentDir : segmentDirs) {
+              segments.putIfAbsent(segmentDir.getName(),
+                  new Segment(fs, segmentDir, conf));
+            }
+          }
+
+          Thread.sleep(60000);
+        } catch (final InterruptedException e) {
+          // ignore
+        } catch (final IOException e) {
+          // ignore
+        }
+      }
+    }
+
+  }
+
+
+  private static class Segment implements java.io.Closeable {
+
+    private static final Partitioner<Text, Writable> PARTITIONER =
+      new HashPartitioner<Text, Writable>();
+
+    private final FileSystem fs;
+    private final Path segmentDir;
 
     private MapFile.Reader[] content;
     private MapFile.Reader[] parseText;
     private MapFile.Reader[] parseData;
     private MapFile.Reader[] crawl;
-    private Configuration conf;
+    private final Configuration conf;
 
     public Segment(FileSystem fs, Path segmentDir, Configuration conf) throws IOException {
       this.fs = fs;
@@ -63,7 +140,7 @@
       }
       return (CrawlDatum)getEntry(crawl, url, new CrawlDatum());
     }
-    
+
     public byte[] getContent(Text url) throws IOException {
       synchronized (this) {
         if (content == null)
@@ -87,7 +164,7 @@
       }
       return (ParseText)getEntry(parseText, url, new ParseText());
     }
-    
+
     private MapFile.Reader[] getReaders(String subDir) throws IOException {
       return MapFileOutputFormat.getReaders(fs, new Path(segmentDir, subDir), this.conf);
     }
@@ -112,31 +189,37 @@
 
   }
 
-  private HashMap segments = new HashMap();
-  private Summarizer summarizer;
+  private final ConcurrentMap<String, Segment> segments =
+    new ConcurrentHashMap<String, Segment>();
+  private final FileSystem fs;
+  private final Configuration conf;
+  private final Path segmentsDir;
+  private final SegmentUpdater segUpdater;
+  private final Summarizer summarizer;
 
   /** Construct given a directory containing fetcher output. */
-  public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException {
-    FileStatus[] fstats = fs.listStatus(new Path(segmentsDir), 
+  public FetchedSegments(Configuration conf, Path segmentsDir)
+  throws IOException {
+    this.conf = conf;
+    this.fs = FileSystem.get(this.conf);
+    final FileStatus[] fstats = fs.listStatus(segmentsDir,
         HadoopFSUtil.getPassDirectoriesFilter(fs));
-    Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
-    this.summarizer = new SummarizerFactory(conf).getSummarizer();
+    final Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
+    this.summarizer = new SummarizerFactory(this.conf).getSummarizer();
+    this.segmentsDir = segmentsDir;
+    this.segUpdater = new SegmentUpdater();
 
     if (segmentDirs != null) {
-        for (int i = 0; i < segmentDirs.length; i++) {
-            Path segmentDir = segmentDirs[i];
-//             Path indexdone = new Path(segmentDir, IndexSegment.DONE_NAME);
-//             if (fs.exists(indexdone) && fs.isFile(indexdone)) {
-//             	segments.put(segmentDir.getName(), new Segment(fs, segmentDir));
-//             }
-            segments.put(segmentDir.getName(), new Segment(fs, segmentDir, conf));
-
-        }
+      for (final Path segmentDir : segmentDirs) {
+        segments.put(segmentDir.getName(),
+          new Segment(this.fs, segmentDir, this.conf));
+      }
     }
+    this.segUpdater.start();
   }
 
   public String[] getSegmentNames() {
-    return (String[])segments.keySet().toArray(new String[segments.size()]);
+    return segments.keySet().toArray(new String[segments.size()]);
   }
 
   public byte[] getContent(HitDetails details) throws IOException {
@@ -158,67 +241,57 @@
 
   public Summary getSummary(HitDetails details, Query query)
     throws IOException {
-    
-    if (this.summarizer == null) { return new Summary(); }
-    
-    Segment segment = getSegment(details);
-    ParseText parseText = segment.getParseText(getUrl(details));
-    String text = (parseText != null) ? parseText.getText() : "";
-    
-    return this.summarizer.getSummary(text, query);
-  }
-    
-  private class SummaryThread extends Thread {
-    private HitDetails details;
-    private Query query;
-
-    private Summary summary;
-    private Throwable throwable;
 
-    public SummaryThread(HitDetails details, Query query) {
-      this.details = details;
-      this.query = query;
-    }
+    if (this.summarizer == null) { return new Summary(); }
 
-    public void run() {
-      try {
-        this.summary = getSummary(details, query);
-      } catch (Throwable throwable) {
-        this.throwable = throwable;
-      }
-    }
+    final Segment segment = getSegment(details);
+    final ParseText parseText = segment.getParseText(getUrl(details));
+    final String text = (parseText != null) ? parseText.getText() : "";
 
+    return this.summarizer.getSummary(text, query);
   }
 
+  public long getProtocolVersion(String protocol, long clientVersion)
+  throws IOException {
+    return VERSION;
+  }
 
   public Summary[] getSummary(HitDetails[] details, Query query)
     throws IOException {
-    SummaryThread[] threads = new SummaryThread[details.length];
-    for (int i = 0; i < threads.length; i++) {
-      threads[i] = new SummaryThread(details[i], query);
-      threads[i].start();
+    final List<Callable<Summary>> tasks =
+      new ArrayList<Callable<Summary>>(details.length);
+    for (int i = 0; i < details.length; i++) {
+      tasks.add(new SummaryTask(details[i], query));
     }
 
-    Summary[] results = new Summary[details.length];
-    for (int i = 0; i < threads.length; i++) {
+    List<Future<Summary>> summaries;
+    try {
+      summaries = executor.invokeAll(tasks);
+    } catch (final InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+
+    final Summary[] results = new Summary[details.length];
+    for (int i = 0; i < details.length; i++) {
+      final Future<Summary> f = summaries.get(i);
+      Summary summary;
       try {
-        threads[i].join();
-      } catch (InterruptedException e) {
+        summary = f.get();
+      } catch (final Exception e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        }
         throw new RuntimeException(e);
       }
-      if (threads[i].throwable instanceof IOException) {
-        throw (IOException)threads[i].throwable;
-      } else if (threads[i].throwable != null) {
-        throw new RuntimeException(threads[i].throwable);
-      }
-      results[i] = threads[i].summary;
+      results[i] = summary;
     }
     return results;
   }
 
 
   private Segment getSegment(HitDetails details) {
-    return (Segment)segments.get(details.getValue("segment"));
+    return segments.get(details.getValue("segment"));
   }
 
   private Text getUrl(HitDetails details) {
@@ -230,10 +303,10 @@
   }
 
   public void close() throws IOException {
-    Iterator iterator = segments.values().iterator();
+    final Iterator<Segment> iterator = segments.values().iterator();
     while (iterator.hasNext()) {
-      ((Segment) iterator.next()).close();
+      iterator.next().close();
     }
   }
-  
+
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java Mon Jan 12 05:26:16 2009
@@ -21,31 +21,33 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 /** A document which matched a query in an index. */
-public class Hit implements Writable, Comparable {
+@SuppressWarnings("unchecked")
+public class Hit implements Writable, Comparable<Hit> {
 
   private int indexNo;                            // index id
-  private int indexDocNo;                         // index-relative id
+  private String uniqueKey;
   private WritableComparable sortValue;           // value sorted on
   private String dedupValue;                      // value to dedup on
   private boolean moreFromDupExcluded;
 
   public Hit() {}
 
-  public Hit(int indexNo, int indexDocNo) {
-    this(indexNo, indexDocNo, null, null);
+  public Hit(int indexNo, String uniqueKey) {
+    this(indexNo, uniqueKey, null, null);
   }
-  public Hit(int indexNo, int indexDocNo,
-             WritableComparable sortValue,
+  public Hit(int indexNo, String uniqueKey,
+      WritableComparable sortValue,
              String dedupValue) {
-    this(indexDocNo, sortValue, dedupValue);
+    this(uniqueKey, sortValue, dedupValue);
     this.indexNo = indexNo;
   }
-  public Hit(int indexDocNo, WritableComparable sortValue, String dedupValue) {
-    this.indexDocNo = indexDocNo;
+  public Hit(String uniqueKey, WritableComparable sortValue, String dedupValue) {
+    this.uniqueKey = uniqueKey;
     this.sortValue = sortValue;
     this.dedupValue = dedupValue == null ? "" : dedupValue;
   }
@@ -54,8 +56,8 @@
   public int getIndexNo() { return indexNo; }
   public void setIndexNo(int indexNo) { this.indexNo = indexNo; }
 
-  /** Return the document number of this hit within an index. */
-  public int getIndexDocNo() { return indexDocNo; }
+  /** Return the unique identifier of this hit within an index. */
+  public String getUniqueKey() { return uniqueKey; }
 
   /** Return the value of the field that hits are sorted on. */
   public WritableComparable getSortValue() { return sortValue; }
@@ -73,39 +75,26 @@
 
   /** Display as a string. */
   public String toString() {
-    return "#" + indexDocNo;
+    return "#" + uniqueKey;
   }
 
-  public boolean equals(Object o) {
-    if (!(o instanceof Hit))
-      return false;
-    Hit other = (Hit)o;
-    return this.indexNo == other.indexNo
-      && this.indexDocNo == other.indexDocNo;
-  }
-
-  public int hashCode() {
-    return indexNo ^ indexDocNo;
-  }
-
-  public int compareTo(Object o) {
-    Hit other = (Hit)o;
+  public int compareTo(Hit other) {
     int compare = sortValue.compareTo(other.sortValue);
     if (compare != 0) {
       return compare;                             // use sortValue
     } else if (other.indexNo != this.indexNo) {
       return other.indexNo - this.indexNo;        // prefer later indexes
     } else {
-      return other.indexDocNo - this.indexDocNo;  // prefer later docs
+      return other.uniqueKey.compareTo(this.uniqueKey);  // prefer later doc
     }
   }
 
   public void write(DataOutput out) throws IOException {
-    out.writeInt(indexDocNo);
+    Text.writeString(out, uniqueKey);
   }
 
   public void readFields(DataInput in) throws IOException {
-    indexDocNo = in.readInt();
+    uniqueKey = Text.readString(in);
   }
 
 }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java Mon Jan 12 05:26:16 2009
@@ -73,14 +73,14 @@
 
   /** Returns all the values with the specified name. */
   public String[] getValues(String field) {
-   ArrayList vals = new ArrayList();
+   ArrayList<String> vals = new ArrayList<String>();
    for (int i=0; i<length; i++) {
      if (fields[i].equals(field)) {
        vals.add(values[i]);
      }
    }
    return (vals.size() > 0)
-            ? (String[]) vals.toArray(new String[vals.size()])
+            ? vals.toArray(new String[vals.size()])
             : null;
 }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java Mon Jan 12 05:26:16 2009
@@ -65,7 +65,6 @@
     return results;
   }
 
-
   public void write(DataOutput out) throws IOException {
     out.writeLong(total);                         // write total hits
     out.writeInt(top.length);                     // write hits returned
@@ -74,12 +73,13 @@
                       
     for (int i = 0; i < top.length; i++) {
       Hit h = top[i];
-      out.writeInt(h.getIndexDocNo());            // write indexDocNo
+      Text.writeString(out, h.getUniqueKey());    // write uniqueKey
       h.getSortValue().write(out);                // write sortValue
       Text.writeString(out, h.getDedupValue());   // write dedupValue
     }
   }
 
+  @SuppressWarnings("unchecked")
   public void readFields(DataInput in) throws IOException {
     total = in.readLong();                        // read total hits
     top = new Hit[in.readInt()];                  // read hits returned
@@ -93,7 +93,7 @@
     }
 
     for (int i = 0; i < top.length; i++) {
-      int indexDocNo = in.readInt();              // read indexDocNo
+      String uniqueKey = Text.readString(in);            // read uniqueKey
 
       WritableComparable sortValue = null;
       try {
@@ -105,7 +105,7 @@
 
       String dedupValue = Text.readString(in);    // read dedupValue
 
-      top[i] = new Hit(indexDocNo, sortValue, dedupValue);
+      top[i] = new Hit(uniqueKey, sortValue, dedupValue);
     }
   }
 

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java Mon Jan 12 05:26:16 2009
@@ -103,12 +103,12 @@
 
   public String getExplanation(Query query, Hit hit) throws IOException {
     return luceneSearcher.explain(this.queryFilters.filter(query),
-                                  hit.getIndexDocNo()).toHtml();
+        Integer.valueOf(hit.getUniqueKey())).toHtml();
   }
 
   public HitDetails getDetails(Hit hit) throws IOException {
 
-    Document doc = luceneSearcher.doc(hit.getIndexDocNo());
+    Document doc = luceneSearcher.doc(Integer.valueOf(hit.getUniqueKey()));
 
     List docFields = doc.getFields();
     String[] fields = new String[docFields.size()];
@@ -162,7 +162,7 @@
 
       String dedupValue = dedupValues == null ? null : dedupValues[doc];
 
-      hits[i] = new Hit(doc, sortValue, dedupValue);
+      hits[i] = new Hit(Integer.toString(doc), sortValue, dedupValue);
     }
     return new Hits(topDocs.totalHits, hits);
   }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java Mon Jan 12 05:26:16 2009
@@ -18,7 +18,6 @@
 package org.apache.nutch.searcher;
 
 import org.apache.lucene.search.Searcher;
-import org.apache.lucene.search.QueryFilter;
 import org.apache.lucene.search.*;
 import org.apache.lucene.index.Term;
 import org.apache.lucene.misc.ChainedFilter;
@@ -83,6 +82,7 @@
   }
   
 
+  @SuppressWarnings("serial")
   private static class TimeExceeded extends RuntimeException {
     public long maxTime;
     private int maxDoc;
@@ -127,7 +127,8 @@
     }
   }
   
-  private static class LimitExceeded extends RuntimeException {
+  @SuppressWarnings("serial")
+private static class LimitExceeded extends RuntimeException {
     private int maxDoc;
     public LimitExceeded(int maxDoc) { this.maxDoc = maxDoc; }    
   }
@@ -151,13 +152,14 @@
    * @param threshold
    *          the fraction of documents which must contain a term
    */
-  public LuceneQueryOptimizer(Configuration conf) {
+  @SuppressWarnings("serial")
+public LuceneQueryOptimizer(Configuration conf) {
     final int cacheSize = conf.getInt("searcher.filter.cache.size", 16);
     this.threshold = conf.getFloat("searcher.filter.cache.threshold",
         0.05f);
     this.searcherMaxHits = conf.getInt("searcher.max.hits", -1);
     this.cache = new LinkedHashMap<BooleanQuery, Filter>(cacheSize, 0.75f, true) {
-      protected boolean removeEldestEntry(Map.Entry eldest) {
+      protected boolean removeEldestEntry(Map.Entry<BooleanQuery, Filter> eldest) {
         return size() > cacheSize; // limit size of cache
       }
     };

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nutch.indexer.Indexer;
+import org.apache.nutch.util.HadoopFSUtil;
+
+public class LuceneSearchBean implements RPCSearchBean {
+
+  public static final long VERSION = 1L;
+
+  private IndexSearcher searcher;
+
+  private FileSystem fs;
+
+  private Configuration conf;
+
+  /**
+   * Construct in a named directory.
+   * @param conf
+   * @param dir
+   * @throws IOException
+   */
+  public LuceneSearchBean(Configuration conf, Path indexDir, Path indexesDir)
+  throws IOException {
+    this.conf = conf;
+    this.fs = FileSystem.get(this.conf);
+    init(indexDir, indexesDir);
+  }
+
+  private void init(Path indexDir, Path indexesDir)
+  throws IOException {
+    if (this.fs.exists(indexDir)) {
+      LOG.info("opening merged index in " + indexDir);
+      this.searcher = new IndexSearcher(indexDir, this.conf);
+    } else {
+      LOG.info("opening indexes in " + indexesDir);
+
+      List<Path> vDirs = new ArrayList<Path>();
+      FileStatus[] fstats = fs.listStatus(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
+      Path[] directories = HadoopFSUtil.getPaths(fstats);
+      for(int i = 0; i < directories.length; i++) {
+        Path indexdone = new Path(directories[i], Indexer.DONE_NAME);
+        if(fs.isFile(indexdone)) {
+          vDirs.add(directories[i]);
+        }
+      }
+
+      directories = new Path[ vDirs.size() ];
+      for(int i = 0; vDirs.size()>0; i++) {
+        directories[i] = vDirs.remove(0);
+      }
+
+      this.searcher = new IndexSearcher(directories, this.conf);
+    }
+  }
+
+  public Hits search(Query query, int numHits, String dedupField,
+                     String sortField, boolean reverse)
+  throws IOException {
+    return searcher.search(query, numHits, dedupField, sortField, reverse);
+  }
+
+  public String getExplanation(Query query, Hit hit) throws IOException {
+    return searcher.getExplanation(query, hit);
+  }
+
+  public HitDetails getDetails(Hit hit) throws IOException {
+    return searcher.getDetails(hit);
+  }
+
+  public HitDetails[] getDetails(Hit[] hits) throws IOException {
+    return searcher.getDetails(hits);
+  }
+
+  public boolean ping() throws IOException {
+    return true;
+  }
+
+  public void close() throws IOException {
+    if (searcher != null) { searcher.close(); }
+    if (fs != null) { fs.close(); }
+  }
+
+  public long getProtocolVersion(String protocol, long clientVersion)
+  throws IOException {
+    return VERSION;
+  }
+
+}

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java Mon Jan 12 05:26:16 2009
@@ -18,29 +18,28 @@
 package org.apache.nutch.searcher;
 
 import java.io.*;
+import java.net.InetSocketAddress;
 import java.util.*;
+
 import javax.servlet.*;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.nutch.parse.*;
-import org.apache.nutch.indexer.*;
 import org.apache.nutch.crawl.Inlinks;
-import org.apache.nutch.util.HadoopFSUtil;
 import org.apache.nutch.util.NutchConfiguration;
 
-/** 
+/**
  * One stop shopping for search-related functionality.
  * @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $
- */   
+ */
 public class NutchBean
-  implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks,
-             DistributedSearch.Protocol, Closeable {
+implements SearchBean, SegmentBean, HitInlinks, Closeable {
 
   public static final Log LOG = LogFactory.getLog(NutchBean.class);
   public static final String KEY = "nutchBean";
@@ -51,122 +50,105 @@
 
   private String[] segmentNames;
 
-  private Searcher searcher;
-  private HitDetailer detailer;
-  private HitSummarizer summarizer;
-  private HitContent content;
-  private HitInlinks linkDb;
+  private SearchBean searchBean;
+  private SegmentBean segmentBean;
+  private final HitInlinks linkDb;
 
 
   /** BooleanQuery won't permit more than 32 required/prohibited clauses.  We
-   * don't want to use too many of those. */ 
+   * don't want to use too many of those. */
   private static final int MAX_PROHIBITED_TERMS = 20;
-  
-  private Configuration conf;
 
-  private FileSystem fs;
+  private final Configuration conf;
+
+  private final FileSystem fs;
 
-  /** Returns the cached instance in the servlet context. 
+  /** Returns the cached instance in the servlet context.
    * @see NutchBeanConstructor*/
   public static NutchBean get(ServletContext app, Configuration conf) throws IOException {
-    NutchBean bean = (NutchBean)app.getAttribute(KEY);
+    final NutchBean bean = (NutchBean)app.getAttribute(KEY);
     return bean;
   }
 
 
   /**
-   * 
+   *
    * @param conf
    * @throws IOException
    */
   public NutchBean(Configuration conf) throws IOException {
     this(conf, null);
   }
-  
+
   /**
-   *  Construct in a named directory. 
+   * Construct in a named directory.
+   *
    * @param conf
    * @param dir
    * @throws IOException
    */
   public NutchBean(Configuration conf, Path dir) throws IOException {
-        this.conf = conf;
-        this.fs = FileSystem.get(this.conf);
-        if (dir == null) {
-            dir = new Path(this.conf.get("searcher.dir", "crawl"));
-        }
-        Path servers = new Path(dir, "search-servers.txt");
-        if (fs.exists(servers)) {
-            if (LOG.isInfoEnabled()) {
-              LOG.info("searching servers in " + servers);
-            }
-            init(new DistributedSearch.Client(servers, conf));
-        } else {
-            init(new Path(dir, "index"), new Path(dir, "indexes"), new Path(
-                    dir, "segments"), new Path(dir, "linkdb"));
-        }
+    this.conf = conf;
+    this.fs = FileSystem.get(this.conf);
+    if (dir == null) {
+      dir = new Path(this.conf.get("searcher.dir", "crawl"));
     }
+    final Path luceneConfig = new Path(dir, "search-servers.txt");
+    final Path solrConfig = new Path(dir, "solr-servers.txt");
+    final Path segmentConfig = new Path(dir, "segment-servers.txt");
 
-  private void init(Path indexDir, Path indexesDir, Path segmentsDir,
-                    Path linkDb)
-    throws IOException {
-    IndexSearcher indexSearcher;
-    if (this.fs.exists(indexDir)) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("opening merged index in " + indexDir);
-      }
-      indexSearcher = new IndexSearcher(indexDir, this.conf);
+    if (fs.exists(luceneConfig) || fs.exists(solrConfig)) {
+      searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig);
     } else {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("opening indexes in " + indexesDir);
-      }
-      
-      Vector vDirs=new Vector();
-      FileStatus[] fstats = fs.listStatus(indexesDir,
-          HadoopFSUtil.getPassDirectoriesFilter(fs));
-      Path [] directories = HadoopFSUtil.getPaths(fstats);
-      for(int i = 0; i < directories.length; i++) {
-        Path indexdone = new Path(directories[i], Indexer.DONE_NAME);
-        if(fs.isFile(indexdone)) {
-          vDirs.add(directories[i]);
+      final Path indexDir = new Path(dir, "index");
+      final Path indexesDir = new Path(dir, "indexes");
+      searchBean = new LuceneSearchBean(conf, indexDir, indexesDir);
+    }
+
+    if (fs.exists(segmentConfig)) {
+      segmentBean = new DistributedSegmentBean(conf, segmentConfig);
+    } else if (fs.exists(luceneConfig)) {
+      segmentBean = new DistributedSegmentBean(conf, luceneConfig);
+    } else {
+      segmentBean = new FetchedSegments(conf, new Path(dir, "segments"));
+    }
+
+    linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf);
+  }
+
+  public static List<InetSocketAddress> readAddresses(Path path,
+      Configuration conf) throws IOException {
+    final List<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
+    for (final String line : readConfig(path, conf)) {
+      final StringTokenizer tokens = new StringTokenizer(line);
+      if (tokens.hasMoreTokens()) {
+        final String host = tokens.nextToken();
+        if (tokens.hasMoreTokens()) {
+          final String port = tokens.nextToken();
+          addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
         }
       }
-      
-      
-      directories = new Path[ vDirs.size() ];
-      for(int i = 0; vDirs.size()>0; i++) {
-        directories[i]=(Path)vDirs.remove(0);
-      }
-      
-      indexSearcher = new IndexSearcher(directories, this.conf);
     }
+    return addrs;
+  }
 
-    if (LOG.isInfoEnabled()) {
-      LOG.info("opening segments in " + segmentsDir);
+  public static List<String> readConfig(Path path, Configuration conf)
+  throws IOException {
+    final FileSystem fs = FileSystem.get(conf);
+    final BufferedReader reader =
+      new BufferedReader(new InputStreamReader(fs.open(path)));
+    try {
+      final ArrayList<String> addrs = new ArrayList<String>();
+      String line;
+      while ((line = reader.readLine()) != null) {
+        addrs.add(line);
+      }
+      return addrs;
+    } finally {
+      reader.close();
     }
-    FetchedSegments segments = new FetchedSegments(this.fs, segmentsDir.toString(),this.conf);
-    
-    this.segmentNames = segments.getSegmentNames();
-
-    this.searcher = indexSearcher;
-    this.detailer = indexSearcher;
-    this.summarizer = segments;
-    this.content = segments;
-
-    if (LOG.isInfoEnabled()) { LOG.info("opening linkdb in " + linkDb); }
-    this.linkDb = new LinkDbInlinks(fs, linkDb, this.conf);
-  }
-
-  private void init(DistributedSearch.Client client) {
-    this.segmentNames = client.getSegmentNames();
-    this.searcher = client;
-    this.detailer = client;
-    this.summarizer = client;
-    this.content = client;
-    this.linkDb = client;
   }
 
-
   public String[] getSegmentNames() {
     return segmentNames;
   }
@@ -174,15 +156,16 @@
   public Hits search(Query query, int numHits) throws IOException {
     return search(query, numHits, null, null, false);
   }
-  
+
   public Hits search(Query query, int numHits,
                      String dedupField, String sortField, boolean reverse)
     throws IOException {
 
-    return searcher.search(query, numHits, dedupField, sortField, reverse);
+    return searchBean.search(query, numHits, dedupField, sortField, reverse);
   }
-  
-  private class DupHits extends ArrayList {
+
+  @SuppressWarnings("serial")
+  private class DupHits extends ArrayList<Hit> {
     private boolean maxSizeExceeded;
   }
 
@@ -191,7 +174,7 @@
    * site are removed from results.  The remaining hits have {@link
    * Hit#moreFromDupExcluded()} set.  <p> If maxHitsPerDup is zero then all
    * hits are returned.
-   * 
+   *
    * @param query query
    * @param numHits number of requested hits
    * @param maxHitsPerDup the maximum hits returned with matching values, or zero
@@ -208,7 +191,7 @@
    * <code>maxHitsPerDup</code> are removed from results.  The remaining hits
    * have {@link Hit#moreFromDupExcluded()} set.  <p> If maxHitsPerDup is zero
    * then all hits are returned.
-   * 
+   *
    * @param query query
    * @param numHits number of requested hits
    * @param maxHitsPerDup the maximum hits returned with matching values, or zero
@@ -226,7 +209,7 @@
    * <code>maxHitsPerDup</code> are removed from results.  The remaining hits
    * have {@link Hit#moreFromDupExcluded()} set.  <p> If maxHitsPerDup is zero
    * then all hits are returned.
-   * 
+   *
    * @param query query
    * @param numHits number of requested hits
    * @param maxHitsPerDup the maximum hits returned with matching values, or zero
@@ -243,35 +226,35 @@
     if (maxHitsPerDup <= 0)                      // disable dup checking
       return search(query, numHits, dedupField, sortField, reverse);
 
-    float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f);
+    final float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f);
     int numHitsRaw = (int)(numHits * rawHitsFactor);
     if (LOG.isInfoEnabled()) {
       LOG.info("searching for "+numHitsRaw+" raw hits");
     }
-    Hits hits = searcher.search(query, numHitsRaw,
+    Hits hits = searchBean.search(query, numHitsRaw,
                                 dedupField, sortField, reverse);
-    long total = hits.getTotal();
-    Map dupToHits = new HashMap();
-    List resultList = new ArrayList();
-    Set seen = new HashSet();
-    List excludedValues = new ArrayList();
+    final long total = hits.getTotal();
+    final Map<String, DupHits> dupToHits = new HashMap<String, DupHits>();
+    final List<Hit> resultList = new ArrayList<Hit>();
+    final Set<Hit> seen = new HashSet<Hit>();
+    final List<String> excludedValues = new ArrayList<String>();
     boolean totalIsExact = true;
     for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) {
       // get the next raw hit
       if (rawHitNum >= hits.getLength()) {
         // optimize query by prohibiting more matches on some excluded values
-        Query optQuery = (Query)query.clone();
+        final Query optQuery = (Query)query.clone();
         for (int i = 0; i < excludedValues.size(); i++) {
           if (i == MAX_PROHIBITED_TERMS)
             break;
-          optQuery.addProhibitedTerm(((String)excludedValues.get(i)),
+          optQuery.addProhibitedTerm(excludedValues.get(i),
                                      dedupField);
         }
         numHitsRaw = (int)(numHitsRaw * rawHitsFactor);
         if (LOG.isInfoEnabled()) {
           LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery);
         }
-        hits = searcher.search(optQuery, numHitsRaw,
+        hits = searchBean.search(optQuery, numHitsRaw,
                                dedupField, sortField, reverse);
         if (LOG.isInfoEnabled()) {
           LOG.info("found "+hits.getTotal()+" raw hits");
@@ -280,14 +263,14 @@
         continue;
       }
 
-      Hit hit = hits.getHit(rawHitNum);
+      final Hit hit = hits.getHit(rawHitNum);
       if (seen.contains(hit))
         continue;
       seen.add(hit);
-      
+
       // get dup hits for its value
-      String value = hit.getDedupValue();
-      DupHits dupHits = (DupHits)dupToHits.get(value);
+      final String value = hit.getDedupValue();
+      DupHits dupHits = dupToHits.get(value);
       if (dupHits == null)
         dupToHits.put(value, dupHits = new DupHits());
 
@@ -297,7 +280,7 @@
 
           // mark prior hits with moreFromDupExcluded
           for (int i = 0; i < dupHits.size(); i++) {
-            ((Hit)dupHits.get(i)).setMoreFromDupExcluded(true);
+            dupHits.get(i).setMoreFromDupExcluded(true);
           }
           dupHits.maxSizeExceeded = true;
 
@@ -316,45 +299,45 @@
       }
     }
 
-    Hits results =
+    final Hits results =
       new Hits(total,
-               (Hit[])resultList.toArray(new Hit[resultList.size()]));
+               resultList.toArray(new Hit[resultList.size()]));
     results.setTotalIsExact(totalIsExact);
     return results;
   }
-    
+
 
   public String getExplanation(Query query, Hit hit) throws IOException {
-    return searcher.getExplanation(query, hit);
+    return searchBean.getExplanation(query, hit);
   }
 
   public HitDetails getDetails(Hit hit) throws IOException {
-    return detailer.getDetails(hit);
+    return searchBean.getDetails(hit);
   }
 
   public HitDetails[] getDetails(Hit[] hits) throws IOException {
-    return detailer.getDetails(hits);
+    return searchBean.getDetails(hits);
   }
 
   public Summary getSummary(HitDetails hit, Query query) throws IOException {
-    return summarizer.getSummary(hit, query);
+    return segmentBean.getSummary(hit, query);
   }
 
   public Summary[] getSummary(HitDetails[] hits, Query query)
     throws IOException {
-    return summarizer.getSummary(hits, query);
+    return segmentBean.getSummary(hits, query);
   }
 
   public byte[] getContent(HitDetails hit) throws IOException {
-    return content.getContent(hit);
+    return segmentBean.getContent(hit);
   }
 
   public ParseData getParseData(HitDetails hit) throws IOException {
-    return content.getParseData(hit);
+    return segmentBean.getParseData(hit);
   }
 
   public ParseText getParseText(HitDetails hit) throws IOException {
-    return content.getParseText(hit);
+    return segmentBean.getParseText(hit);
   }
 
   public String[] getAnchors(HitDetails hit) throws IOException {
@@ -366,67 +349,80 @@
   }
 
   public long getFetchDate(HitDetails hit) throws IOException {
-    return content.getFetchDate(hit);
+    return segmentBean.getFetchDate(hit);
   }
 
   public void close() throws IOException {
-    if (content != null) { content.close(); }
-    if (searcher != null) { searcher.close(); }
+    if (searchBean != null) { searchBean.close(); }
+    if (segmentBean != null) { segmentBean.close(); }
     if (linkDb != null) { linkDb.close(); }
     if (fs != null) { fs.close(); }
   }
-  
+
+  public boolean ping() {
+    return true;
+  }
+
   /** For debugging. */
   public static void main(String[] args) throws Exception {
-    String usage = "NutchBean query";
+    final String usage = "NutchBean query";
 
     if (args.length == 0) {
       System.err.println(usage);
       System.exit(-1);
     }
 
-    Configuration conf = NutchConfiguration.create();
-    NutchBean bean = new NutchBean(conf);
-    Query query = Query.parse(args[0], conf);
-    Hits hits = bean.search(query, 10);
+    final Configuration conf = NutchConfiguration.create();
+    final NutchBean bean = new NutchBean(conf);
+    final Query query = Query.parse(args[0], conf);
+    final Hits hits = bean.search(query, 10);
     System.out.println("Total hits: " + hits.getTotal());
-    int length = (int)Math.min(hits.getTotal(), 10);
-    Hit[] show = hits.getHits(0, length);
-    HitDetails[] details = bean.getDetails(show);
-    Summary[] summaries = bean.getSummary(details, query);
+    final int length = (int)Math.min(hits.getTotal(), 10);
+    final Hit[] show = hits.getHits(0, length);
+    final HitDetails[] details = bean.getDetails(show);
+    final Summary[] summaries = bean.getSummary(details, query);
 
     for (int i = 0; i < hits.getLength(); i++) {
-      System.out.println(" "+i+" "+ details[i] + "\n" + summaries[i]);
+      System.out.println(" " + i + " " + details[i] + "\n" + summaries[i]);
     }
   }
 
-  public long getProtocolVersion(String className, long arg1) throws IOException {
-    if(DistributedSearch.Protocol.class.getName().equals(className)){
-      return 1;
+  public long getProtocolVersion(String className, long clientVersion)
+  throws IOException {
+    if(RPCSearchBean.class.getName().equals(className) &&
+       searchBean instanceof RPCSearchBean) {
+
+      final RPCSearchBean rpcBean = (RPCSearchBean)searchBean;
+      return rpcBean.getProtocolVersion(className, clientVersion);
+    } else if (SegmentBean.class.getName().equals(className) &&
+               segmentBean instanceof RPCSegmentBean) {
+
+      final RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean;
+      return rpcBean.getProtocolVersion(className, clientVersion);
     } else {
       throw new IOException("Unknown Protocol classname:" + className);
     }
   }
 
-  /** Responsible for constructing a NutchBean singleton instance and 
-   *  caching it in the servlet context. This class should be registered in 
-   *  the deployment descriptor as a listener 
+  /** Responsible for constructing a NutchBean singleton instance and
+   *  caching it in the servlet context. This class should be registered in
+   *  the deployment descriptor as a listener
    */
   public static class NutchBeanConstructor implements ServletContextListener {
-    
+
     public void contextDestroyed(ServletContextEvent sce) { }
 
     public void contextInitialized(ServletContextEvent sce) {
-      ServletContext app = sce.getServletContext();
-      Configuration conf = NutchConfiguration.get(app);
-      
+      final ServletContext app = sce.getServletContext();
+      final Configuration conf = NutchConfiguration.get(app);
+
       LOG.info("creating new bean");
       NutchBean bean = null;
       try {
         bean = new NutchBean(conf);
         app.setAttribute(KEY, bean);
       }
-      catch (IOException ex) {
+      catch (final IOException ex) {
         LOG.error(StringUtils.stringifyException(ex));
       }
     }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java Mon Jan 12 05:26:16 2009
@@ -43,8 +43,10 @@
 
 /** Present search results using A9's OpenSearch extensions to RSS, plus a few
  * Nutch-specific extensions. */   
+@SuppressWarnings("serial")
 public class OpenSearchServlet extends HttpServlet {
-  private static final Map NS_MAP = new HashMap();
+  private static final Map<String, String> NS_MAP =
+	  new HashMap<String, String>();
   private int MAX_HITS_PER_PAGE;
 
   static {
@@ -52,7 +54,7 @@
     NS_MAP.put("nutch", "http://www.nutch.org/opensearchrss/1.0/");
   }
 
-  private static final Set SKIP_DETAILS = new HashSet();
+  private static final Set<String> SKIP_DETAILS = new HashSet<String>();
   static {
     SKIP_DETAILS.add("url");                   // redundant with RSS link
     SKIP_DETAILS.add("title");                 // redundant with RSS title
@@ -171,8 +173,8 @@
       Element rss = addNode(doc, doc, "rss");
       addAttribute(doc, rss, "version", "2.0");
       addAttribute(doc, rss, "xmlns:opensearch",
-                   (String)NS_MAP.get("opensearch"));
-      addAttribute(doc, rss, "xmlns:nutch", (String)NS_MAP.get("nutch"));
+                   NS_MAP.get("opensearch"));
+      addAttribute(doc, rss, "xmlns:nutch", NS_MAP.get("nutch"));
 
       Element channel = addNode(doc, rss, "channel");
     
@@ -214,7 +216,7 @@
         HitDetails detail = details[i];
         String title = detail.getValue("title");
         String url = detail.getValue("url");
-        String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getIndexDocNo();
+        String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getUniqueKey();
       
         if (title == null || title.equals("")) {   // use url for docs w/o title
           title = url;
@@ -283,7 +285,7 @@
 
   private static void addNode(Document doc, Node parent,
                               String ns, String name, String text) {
-    Element child = doc.createElementNS((String)NS_MAP.get(ns), ns+":"+name);
+    Element child = doc.createElementNS(NS_MAP.get(ns), ns+":"+name);
     child.appendChild(doc.createTextNode(getLegalXml(text)));
     parent.appendChild(child);
   }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java Mon Jan 12 05:26:16 2009
@@ -282,7 +282,7 @@
   }
 
 
-  private ArrayList clauses = new ArrayList();
+  private ArrayList<Clause> clauses = new ArrayList<Clause>();
 
   private Configuration conf;
 
@@ -305,7 +305,7 @@
   
   /** Return all clauses. */
   public Clause[] getClauses() {
-    return (Clause[])clauses.toArray(CLAUSES_PROTO);
+    return clauses.toArray(CLAUSES_PROTO);
   }
 
   /** Add a required term in the default field. */
@@ -361,7 +361,7 @@
   public void write(DataOutput out) throws IOException {
     out.writeByte(clauses.size());
     for (int i = 0; i < clauses.size(); i++)
-      ((Clause)clauses.get(i)).write(out);
+      clauses.get(i).write(out);
   }
   
   public static Query read(DataInput in, Configuration conf) throws IOException {
@@ -404,7 +404,7 @@
     } catch (CloneNotSupportedException e) {
       throw new RuntimeException(e);
     }
-    clone.clauses = (ArrayList)clauses.clone();
+    clone.clauses = (ArrayList<Clause>)clauses.clone();
     return clone;
   }
 
@@ -412,9 +412,9 @@
   /** Flattens a query into the set of text terms that it contains.  These are
    * terms which should be higlighted in matching documents. */
   public String[] getTerms() {
-    ArrayList result = new ArrayList();
+    ArrayList<String> result = new ArrayList<String>();
     for (int i = 0; i < clauses.size(); i++) {
-      Clause clause = (Clause)clauses.get(i);
+      Clause clause = clauses.get(i);
       if (!clause.isProhibited()) {
         if (clause.isPhrase()) {
           Term[] terms = clause.getPhrase().getTerms();
@@ -426,7 +426,7 @@
         }
       }
     }
-    return (String[])result.toArray(new String[result.size()]);
+    return result.toArray(new String[result.size()]);
   }
 
   /**
@@ -457,7 +457,7 @@
     for (int i = 0; i < clauses.length; i++) {
       Clause c = clauses[i];
       if (!new QueryFilters(conf).isField(c.getField())) {  // unknown field
-        ArrayList terms = new ArrayList();        // add name to query
+        ArrayList<Term> terms = new ArrayList<Term>();        // add name to query
         if (c.isPhrase()) {                       
           terms.addAll(Arrays.asList(c.getPhrase().getTerms()));
         } else {
@@ -467,7 +467,7 @@
         c = (Clause)c.clone();
         c.field = Clause.DEFAULT_FIELD;           // use default field instead
         c.termOrPhrase
-          = new Phrase((Term[])terms.toArray(new Term[terms.size()]));
+          = new Phrase(terms.toArray(new Term[terms.size()]));
       }
       output.clauses.add(c);                    // copy clause to output
     }

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java Mon Jan 12 05:26:16 2009
@@ -17,6 +17,7 @@
 
 package org.apache.nutch.searcher;
 
+@SuppressWarnings("serial")
 public class QueryException extends java.io.IOException {
   public QueryException(String message) {
     super(message);

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java Mon Jan 12 05:26:16 2009
@@ -40,14 +40,14 @@
   private static final Log LOG = LogFactory.getLog(QueryFilters.class);
 
   private QueryFilter[] queryFilters;
-  private HashSet FIELD_NAMES ;
-  private HashSet RAW_FIELD_NAMES;
+  private HashSet<String> FIELD_NAMES ;
+  private HashSet<String> RAW_FIELD_NAMES;
 
-  private static ArrayList parseFieldNames(Extension extension,
+  private static List<String> parseFieldNames(Extension extension,
                                            String attribute) {
     String fields = extension.getAttribute(attribute);
     if (fields == null) fields = "";
-    return Collections.list(new StringTokenizer(fields, " ,\t\n\r"));
+    return Arrays.asList(fields.split("[,\\s]"));
   }
 
   public QueryFilters(Configuration conf) {
@@ -61,13 +61,14 @@
         if (point == null)
           throw new RuntimeException(QueryFilter.X_POINT_ID + " not found.");
         Extension[] extensions = point.getExtensions();
-        FIELD_NAMES = new HashSet();
-        RAW_FIELD_NAMES = new HashSet();
+        FIELD_NAMES = new HashSet<String>();
+        RAW_FIELD_NAMES = new HashSet<String>();
         QueryFilter[] filters = new QueryFilter[extensions.length];
         for (int i = 0; i < extensions.length; i++) {
           Extension extension = extensions[i];
-          ArrayList fieldNames = parseFieldNames(extension, "fields");
-          ArrayList rawFieldNames = parseFieldNames(extension, "raw-fields");
+          List<String> fieldNames = parseFieldNames(extension, "fields");
+          List<String> rawFieldNames =
+            parseFieldNames(extension, "raw-fields");
           if (fieldNames.size() == 0 && rawFieldNames.size() == 0) {
             if (LOG.isWarnEnabled()) {
               LOG.warn("QueryFilter: " + extension.getId()
@@ -90,8 +91,8 @@
           .getName());
     } else {
       // cache already filled
-      FIELD_NAMES = (HashSet) objectCache.getObject("FIELD_NAMES");
-      RAW_FIELD_NAMES = (HashSet) objectCache.getObject("RAW_FIELD_NAMES");
+      FIELD_NAMES = (HashSet<String>) objectCache.getObject("FIELD_NAMES");
+      RAW_FIELD_NAMES = (HashSet<String>) objectCache.getObject("RAW_FIELD_NAMES");
     }
   }              
 

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,7 @@
+package org.apache.nutch.searcher;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface RPCSearchBean extends SearchBean, VersionedProtocol {
+
+}

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,7 @@
+package org.apache.nutch.searcher;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface RPCSegmentBean extends SegmentBean, VersionedProtocol {
+
+}

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,12 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public interface SearchBean extends Searcher, HitDetailer {
+  public static final Log LOG = LogFactory.getLog(SearchBean.class);
+
+  public boolean ping() throws IOException ;
+}

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,8 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+
+public interface SegmentBean extends HitContent, HitSummarizer {
+
+  public String[] getSegmentNames() throws IOException;
+}

Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.util.ToStringUtils;
+import org.apache.nutch.indexer.solr.SolrWriter;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrQuery.ORDER;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+
+public class SolrSearchBean implements SearchBean {
+
+  public static final Log LOG = LogFactory.getLog(SolrSearchBean.class);
+
+  private final SolrServer solr;
+
+  private final QueryFilters filters;
+
+  public SolrSearchBean(Configuration conf, String solrServer)
+  throws IOException {
+    solr = new CommonsHttpSolrServer(solrServer);
+    filters = new QueryFilters(conf);
+  }
+
+  public String getExplanation(Query query, Hit hit) throws IOException {
+    return "SOLR backend does not support explanations yet.";
+  }
+
+  @SuppressWarnings("unchecked")
+  public Hits search(Query query, int numHits, String dedupField,
+                     String sortField, boolean reverse)
+  throws IOException {
+
+    // filter query string
+    final BooleanQuery bQuery = filters.filter(query);
+
+    final SolrQuery solrQuery = new SolrQuery(stringify(bQuery));
+
+    solrQuery.setRows(numHits);
+
+    if (sortField == null) {
+      solrQuery.setFields(dedupField, "score", "id");
+      sortField = "score";
+    } else {
+      solrQuery.setFields(dedupField, sortField, "id");
+      solrQuery.setSortField(sortField, reverse ? ORDER.asc : ORDER.desc);
+    }
+
+    QueryResponse response;
+    try {
+      response = solr.query(solrQuery);
+    } catch (final SolrServerException e) {
+      throw SolrWriter.makeIOException(e);
+    }
+
+    final SolrDocumentList docList = response.getResults();
+
+    final Hit[] hitArr = new Hit[docList.size()];
+    for (int i = 0; i < hitArr.length; i++) {
+      final SolrDocument solrDoc = docList.get(i);
+
+      final Object raw = solrDoc.getFirstValue(sortField);
+      WritableComparable sortValue;
+
+      if (raw instanceof Integer) {
+        sortValue = new IntWritable(((Integer)raw).intValue());
+      } else if (raw instanceof Float) {
+        sortValue = new FloatWritable(((Float)raw).floatValue());
+      } else if (raw instanceof String) {
+        sortValue = new Text((String)raw);
+      } else if (raw instanceof Long) {
+        sortValue = new LongWritable(((Long)raw).longValue());
+      } else {
+        throw new RuntimeException("Unknown sort value type!");
+      }
+
+      final String dedupValue = (String) solrDoc.getFirstValue(dedupField);
+
+      final String uniqueKey = (String )solrDoc.getFirstValue("id");
+
+      hitArr[i] = new Hit(uniqueKey, sortValue, dedupValue);
+    }
+
+    return new Hits(docList.getNumFound(), hitArr);
+  }
+
+  public HitDetails getDetails(Hit hit) throws IOException {
+    QueryResponse response;
+    try {
+      response = solr.query(new SolrQuery("id:\"" + hit.getUniqueKey() + "\""));
+    } catch (final SolrServerException e) {
+      throw SolrWriter.makeIOException(e);
+    }
+
+    final SolrDocumentList docList = response.getResults();
+    if (docList.getNumFound() == 0) {
+      return null;
+    }
+
+    return buildDetails(docList.get(0));
+  }
+
+  public HitDetails[] getDetails(Hit[] hits) throws IOException {
+    final StringBuilder buf = new StringBuilder();
+    buf.append("(");
+    for (final Hit hit : hits) {
+      buf.append(" id:\"");
+      buf.append(hit.getUniqueKey());
+      buf.append("\"");
+    }
+    buf.append(")");
+
+    QueryResponse response;
+    try {
+      response = solr.query(new SolrQuery(buf.toString()));
+    } catch (final SolrServerException e) {
+      throw SolrWriter.makeIOException(e);
+    }
+
+    final SolrDocumentList docList = response.getResults();
+    if (docList.size() < hits.length) {
+      throw new RuntimeException("Missing hit details! Found: " +
+                                 docList.size() + ", expecting: " +
+                                 hits.length);
+    }
+
+    /* Response returned from SOLR server may be out of
+     * order. So we make sure that nth element of HitDetails[]
+     * is the detail of nth hit.
+     */
+    final Map<String, HitDetails> detailsMap =
+      new HashMap<String, HitDetails>(hits.length);
+    for (final SolrDocument solrDoc : docList) {
+      final HitDetails details = buildDetails(solrDoc);
+      detailsMap.put(details.getValue("id"), details);
+    }
+
+    final HitDetails[] detailsArr = new HitDetails[hits.length];
+    for (int i = 0; i < hits.length; i++) {
+      detailsArr[i] = detailsMap.get(hits[i].getUniqueKey());
+    }
+
+    return detailsArr;
+  }
+
+  public boolean ping() throws IOException {
+    try {
+      return solr.ping().getStatus() == 0;
+    } catch (final SolrServerException e) {
+      throw SolrWriter.makeIOException(e);
+    }
+  }
+
+  public void close() throws IOException { }
+
+  private static HitDetails buildDetails(SolrDocument solrDoc) {
+    final List<String> fieldList = new ArrayList<String>();
+    final List<String> valueList = new ArrayList<String>();
+    for (final String field : solrDoc.getFieldNames()) {
+      for (final Object o : solrDoc.getFieldValues(field)) {
+        fieldList.add(field);
+        valueList.add(o.toString());
+      }
+    }
+
+    final String[] fields = fieldList.toArray(new String[fieldList.size()]);
+    final String[] values = valueList.toArray(new String[valueList.size()]);
+    return new HitDetails(fields, values);
+  }
+
+  /* Hackish solution for stringifying queries. Code from BooleanQuery.
+   * This is necessary because a BooleanQuery.toString produces
+   * statements like feed:http://www.google.com which doesn't work, we
+   * need feed:"http://www.google.com".
+   */
+  private static String stringify(BooleanQuery bQuery) {
+    final StringBuilder buffer = new StringBuilder();
+    final boolean needParens=(bQuery.getBoost() != 1.0) ||
+                       (bQuery.getMinimumNumberShouldMatch()>0) ;
+    if (needParens) {
+      buffer.append("(");
+    }
+
+    final BooleanClause[] clauses  = bQuery.getClauses();
+    int i = 0;
+    for (final BooleanClause c : clauses) {
+      if (c.isProhibited())
+        buffer.append("-");
+      else if (c.isRequired())
+        buffer.append("+");
+
+      final org.apache.lucene.search.Query subQuery = c.getQuery();
+      if (subQuery instanceof BooleanQuery) {   // wrap sub-bools in parens
+        buffer.append("(");
+        buffer.append(c.getQuery().toString(""));
+        buffer.append(")");
+      } else if (subQuery instanceof TermQuery) {
+        final Term term = ((TermQuery) subQuery).getTerm();
+        buffer.append(term.field());
+        buffer.append(":\"");
+        buffer.append(term.text());
+        buffer.append("\"");
+      } else {
+        buffer.append(" ");
+        buffer.append(c.getQuery().toString());
+      }
+
+      if (i++ != clauses.length - 1) {
+        buffer.append(" ");
+      }
+    }
+
+    if (needParens) {
+      buffer.append(")");
+    }
+
+    if (bQuery.getMinimumNumberShouldMatch()>0) {
+      buffer.append('~');
+      buffer.append(bQuery.getMinimumNumberShouldMatch());
+    }
+
+    if (bQuery.getBoost() != 1.0f) {
+      buffer.append(ToStringUtils.boost(bQuery.getBoost()));
+    }
+
+    return buffer.toString();
+  }
+
+}

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java Mon Jan 12 05:26:16 2009
@@ -88,7 +88,7 @@
     public boolean isEllipsis() { return true; }
   }
 
-  private ArrayList fragments = new ArrayList();
+  private ArrayList<Fragment> fragments = new ArrayList<Fragment>();
 
   private static final Fragment[] FRAGMENT_PROTO = new Fragment[0];
 
@@ -100,7 +100,7 @@
 
   /** Returns an array of all of this summary's fragments.*/
   public Fragment[] getFragments() {
-    return (Fragment[])fragments.toArray(FRAGMENT_PROTO);
+    return fragments.toArray(FRAGMENT_PROTO);
   }
 
   /** Returns a String representation of this Summary. */
@@ -126,7 +126,7 @@
     Fragment fragment = null;
     StringBuffer buf = new StringBuffer();
     for (int i=0; i<fragments.size(); i++) {
-      fragment = (Fragment) fragments.get(i);
+      fragment = fragments.get(i);
       if (fragment.isHighlight()) {
         buf.append("<span class=\"highlight\">")
            .append(encode ? Entities.encode(fragment.getText())
@@ -185,7 +185,7 @@
     out.writeInt(fragments.size());
     Fragment fragment = null;
     for (int i=0; i<fragments.size(); i++) {
-      fragment = (Fragment) fragments.get(i);
+      fragment = fragments.get(i);
       if (fragment.isHighlight()) {
         out.writeByte(HIGHLIGHT);
         Text.writeString(out, fragment.getText());

Modified: lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java Mon Jan 12 05:26:16 2009
@@ -77,7 +77,7 @@
     }
 
     Hit hit = new Hit(Integer.parseInt(request.getParameter("idx")),
-                      Integer.parseInt(request.getParameter("id")));
+                      request.getParameter("id"));
     HitDetails details = bean.getDetails(hit);
 
     // raw bytes

Modified: lucene/nutch/trunk/src/plugin/build.xml
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/plugin/build.xml?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/plugin/build.xml (original)
+++ lucene/nutch/trunk/src/plugin/build.xml Mon Jan 12 05:26:16 2009
@@ -91,7 +91,7 @@
   <!-- Test all of the plugins.                               -->
   <!-- ====================================================== -->
   <target name="test">
-    <parallel threadCount="2">
+    <!--<parallel threadCount="2">-->
      <ant dir="creativecommons" target="test"/>
      <ant dir="languageidentifier" target="test"/>
      <ant dir="lib-http" target="test"/>
@@ -118,7 +118,7 @@
      <ant dir="urlnormalizer-basic" target="test"/>
      <ant dir="urlnormalizer-pass" target="test"/>
      <ant dir="urlnormalizer-regex" target="test"/>
-    </parallel>
+    <!--</parallel>-->
   </target>
 
   <!-- ====================================================== -->

Modified: lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java (original)
+++ lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java Mon Jan 12 05:26:16 2009
@@ -17,20 +17,19 @@
 
 package org.creativecommons.nutch;
 
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
 import org.apache.nutch.metadata.CreativeCommons;
 
 import org.apache.nutch.parse.Parse;
 
 import org.apache.nutch.indexer.IndexingFilter;
 import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
 import org.apache.hadoop.io.Text;
 
 import org.apache.nutch.crawl.CrawlDatum;
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.metadata.Metadata;
-import org.apache.nutch.metadata.CreativeCommons;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -50,7 +49,7 @@
 
   private Configuration conf;
 
-  public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
     throws IndexingException {
     
     Metadata metadata = parse.getData().getParseMeta();
@@ -86,7 +85,7 @@
   /** Add the features represented by a license URL.  Urls are of the form
    * "http://creativecommons.org/licenses/xx-xx/xx/xx", where "xx" names a
    * license feature. */
-  public void addUrlFeatures(Document doc, String urlString) {
+  public void addUrlFeatures(NutchDocument doc, String urlString) {
     try {
       URL url = new URL(urlString);
 
@@ -108,8 +107,12 @@
     }
   }
   
-  private void addFeature(Document doc, String feature) {
-    doc.add(new Field(FIELD, feature, Field.Store.YES, Field.Index.UN_TOKENIZED));
+  private void addFeature(NutchDocument doc, String feature) {
+    doc.add(FIELD, feature);
+  }
+
+  public void addIndexBackendOptions(Configuration conf) {
+    LuceneWriter.addFieldOptions(FIELD, LuceneWriter.STORE.YES, LuceneWriter.INDEX.UNTOKENIZED, conf);
   }
 
   public void setConf(Configuration conf) {