You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by do...@apache.org on 2009/01/12 14:26:35 UTC
svn commit: r733738 [2/3] - in /lucene/nutch/trunk: ./ bin/ lib/
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/indexer/
src/java/org/apache/nutch/indexer/lucene/
src/java/org/apache/nutch/indexer/solr/ src/java/org/apache/nutch/net/
src/ja...
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,326 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.util.StringUtils;
+
+public class DistributedSearchBean implements SearchBean {
+
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private final ScheduledExecutorService pingService;
+
+ private class SearchTask implements Callable<Hits> {
+ private int id;
+
+ private Query query;
+ private int numHits;
+ private String dedupField;
+ private String sortField;
+ private boolean reverse;
+
+ public SearchTask(int id) {
+ this.id = id;
+ }
+
+ public Hits call() throws Exception {
+ if (!liveServers[id]) {
+ return null;
+ }
+ return beans[id].search(query, numHits, dedupField, sortField, reverse);
+ }
+
+ public void setSearchArgs(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse) {
+ this.query = query;
+ this.numHits = numHits;
+ this.dedupField = dedupField;
+ this.sortField = sortField;
+ this.reverse = reverse;
+ }
+
+ }
+
+ private class DetailTask implements Callable<HitDetails[]> {
+ private int id;
+
+ private Hit[] hits;
+
+ public DetailTask(int id) {
+ this.id = id;
+ }
+
+ public HitDetails[] call() throws Exception {
+ if (hits == null) {
+ return null;
+ }
+ return beans[id].getDetails(hits);
+ }
+
+ public void setHits(Hit[] hits) {
+ this.hits = hits;
+ }
+
+ }
+
+ private class PingWorker implements Runnable {
+ private int id;
+
+ public PingWorker(int id) {
+ this.id = id;
+ }
+
+ public void run() {
+ try {
+ if (beans[id].ping()) {
+ liveServers[id] = true;
+ } else {
+ liveServers[id] = false;
+ }
+ } catch (IOException e) {
+ liveServers[id] = false;
+ }
+ }
+ }
+
+ private volatile boolean liveServers[];
+
+ private SearchBean[] beans;
+
+ private List<Callable<Hits>> searchTasks;
+
+ private List<Callable<HitDetails[]>> detailTasks;
+
+ private List<PingWorker> pingWorkers;
+
+ private long timeout;
+
+ public DistributedSearchBean(Configuration conf,
+ Path luceneConfig, Path solrConfig)
+ throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+
+ this.timeout = conf.getLong("ipc.client.timeout", 60000);
+
+ List<SearchBean> beanList = new ArrayList<SearchBean>();
+
+ if (fs.exists(luceneConfig)) {
+ addLuceneBeans(beanList, luceneConfig, conf);
+ }
+
+ if (fs.exists(solrConfig)) {
+ addSolrBeans(beanList, solrConfig, conf);
+ }
+
+ beans = beanList.toArray(new SearchBean[beanList.size()]);
+
+ liveServers = new boolean[beans.length];
+ for (int i = 0; i < liveServers.length; i++) {
+ liveServers[i] = true;
+ }
+
+ searchTasks = new ArrayList<Callable<Hits>>();
+ detailTasks = new ArrayList<Callable<HitDetails[]>>();
+ pingWorkers = new ArrayList<PingWorker>();
+
+ for (int i = 0; i < beans.length; i++) {
+ searchTasks.add(new SearchTask(i));
+ detailTasks.add(new DetailTask(i));
+ pingWorkers.add(new PingWorker(i));
+ }
+
+ pingService = Executors.newScheduledThreadPool(beans.length);
+ for (PingWorker worker : pingWorkers) {
+ pingService.scheduleAtFixedRate(worker, 0, 10, TimeUnit.SECONDS);
+ }
+
+ }
+
+ private static void addLuceneBeans(List<SearchBean> beanList,
+ Path luceneConfig, Configuration conf)
+ throws IOException {
+ Configuration newConf = new Configuration(conf);
+
+ // do not retry connections
+ newConf.setInt("ipc.client.connect.max.retries", 0);
+
+ List<InetSocketAddress> luceneServers =
+ NutchBean.readAddresses(luceneConfig, conf);
+ for (InetSocketAddress addr : luceneServers) {
+ beanList.add((RPCSearchBean) RPC.getProxy(RPCSearchBean.class,
+ LuceneSearchBean.VERSION, addr, newConf));
+ }
+ }
+
+ private static void addSolrBeans(List<SearchBean> beanList,
+ Path solrConfig, Configuration conf)
+ throws IOException {
+ for (String solrServer : NutchBean.readConfig(solrConfig, conf)) {
+ beanList.add(new SolrSearchBean(conf, solrServer));
+ }
+ }
+
+ public String getExplanation(Query query, Hit hit) throws IOException {
+ return beans[hit.getIndexNo()].getExplanation(query, hit);
+ }
+
+ public Hits search(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse) throws IOException {
+
+ for (Callable<Hits> task : searchTasks) {
+ ((SearchTask)task).setSearchArgs(query, numHits, dedupField, sortField,
+ reverse);
+ }
+
+ List<Future<Hits>> allHits;
+ try {
+ allHits =
+ executor.invokeAll(searchTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ PriorityQueue<Hit> queue; // cull top hits from results
+ if (sortField == null || reverse) {
+ queue = new PriorityQueue<Hit>(numHits);
+ } else {
+ queue = new PriorityQueue<Hit>(numHits, new Comparator<Hit>() {
+ public int compare(Hit h1, Hit h2) {
+ return h2.compareTo(h1); // reverse natural order
+ }
+ });
+ }
+
+ long totalHits = 0;
+ int allHitsSize = allHits.size();
+ for (int i = 0; i < allHitsSize; i++) {
+ Hits hits = null;
+ try {
+ hits = allHits.get(i).get();
+ } catch (InterruptedException e) {
+ // ignore
+ } catch (ExecutionException e) {
+ LOG.warn("Retrieving hits failed with exception: " +
+ StringUtils.stringifyException(e.getCause()));
+ }
+
+ if (hits == null) {
+ continue;
+ }
+
+ totalHits += hits.getTotal();
+
+ int hitsLength = hits.getLength();
+ for (int j = 0; j < hitsLength; j++) {
+ Hit hit = hits.getHit(j);
+ Hit newHit = new Hit(i, hit.getUniqueKey(),
+ hit.getSortValue(), hit.getDedupValue());
+ queue.add(newHit);
+ if (queue.size() > numHits) { // if hit queue overfull
+ queue.remove();
+ }
+ }
+ }
+
+ // we have to sort results since PriorityQueue.toArray
+ // may not return results in sorted order
+ Hit[] culledResults = queue.toArray(new Hit[queue.size()]);
+ Arrays.sort(culledResults, Collections.reverseOrder(queue.comparator()));
+
+ return new Hits(totalHits, culledResults);
+ }
+
+ public void close() throws IOException {
+ executor.shutdown();
+ pingService.shutdown();
+ for (SearchBean bean : beans) {
+ bean.close();
+ }
+ }
+
+ public HitDetails getDetails(Hit hit) throws IOException {
+ return beans[hit.getIndexNo()].getDetails(hit);
+ }
+
+ @SuppressWarnings("unchecked")
+ public HitDetails[] getDetails(Hit[] hits) throws IOException {
+ List<Hit>[] hitList = new ArrayList[detailTasks.size()];
+
+ for (int i = 0; i < hitList.length; i++) {
+ hitList[i] = new ArrayList<Hit>();
+ }
+
+ for (int i = 0; i < hits.length; i++) {
+ Hit hit = hits[i];
+ hitList[hit.getIndexNo()].add(hit);
+ }
+
+ for (int i = 0; i < detailTasks.size(); i++) {
+ DetailTask task = (DetailTask)detailTasks.get(i);
+ if (hitList[i].size() > 0) {
+ task.setHits(hitList[i].toArray(new Hit[hitList[i].size()]));
+ } else {
+ task.setHits(null);
+ }
+ }
+
+ List<Future<HitDetails[]>> allDetails;
+ try {
+ allDetails =
+ executor.invokeAll(detailTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ /* getDetails(Hit[]) method assumes that HitDetails[i] returned corresponds
+ * to Hit[i] given as parameter. To keep this order, we have to 'merge'
+ * HitDetails[] returned from individual detailTasks.
+ */
+ HitDetails[][] detailsMatrix = new HitDetails[detailTasks.size()][];
+ for (int i = 0; i < detailsMatrix.length; i++) {
+ try {
+ detailsMatrix[i] = allDetails.get(i).get();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ int[] hitPos = new int[detailTasks.size()]; // keep track of where we are
+ HitDetails[] detailsArr = new HitDetails[hits.length];
+ for (int i = 0; i < detailsArr.length; i++) {
+ int indexNo = hits[i].getIndexNo();
+ detailsArr[i] = detailsMatrix[indexNo][(hitPos[indexNo]++)];
+ }
+
+ return detailsArr;
+ }
+
+ public boolean ping() {
+ return true; // not used
+ }
+
+}
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/DistributedSegmentBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,214 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.nutch.parse.ParseData;
+import org.apache.nutch.parse.ParseText;
+
+public class DistributedSegmentBean implements SegmentBean {
+
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private final ScheduledExecutorService pingService;
+
+ private class DistSummmaryTask implements Callable<Summary[]> {
+ private int id;
+
+ private HitDetails[] details;
+ private Query query;
+
+ public DistSummmaryTask(int id) {
+ this.id = id;
+ }
+
+ public Summary[] call() throws Exception {
+ if (details == null) {
+ return null;
+ }
+ return beans[id].getSummary(details, query);
+ }
+
+ public void setSummaryArgs(HitDetails[] details, Query query) {
+ this.details = details;
+ this.query = query;
+ }
+
+ }
+
+ private class SegmentWorker implements Runnable {
+ private int id;
+
+ public SegmentWorker(int id) {
+ this.id = id;
+ }
+
+ public void run() {
+ try {
+ String[] segments = beans[id].getSegmentNames();
+ for (String segment : segments) {
+ segmentMap.put(segment, id);
+ }
+ } catch (IOException e) {
+ // remove all segments this bean was serving
+ Iterator<Map.Entry<String, Integer>> i =
+ segmentMap.entrySet().iterator();
+ while (i.hasNext()) {
+ Map.Entry<String, Integer> entry = i.next();
+ int curId = entry.getValue();
+ if (curId == this.id) {
+ i.remove();
+ }
+ }
+ }
+ }
+ }
+
+ private long timeout;
+
+ private SegmentBean[] beans;
+
+ private ConcurrentMap<String, Integer> segmentMap;
+
+ private List<Callable<Summary[]>> summaryTasks;
+
+ private List<SegmentWorker> segmentWorkers;
+
+ public DistributedSegmentBean(Configuration conf, Path serversConfig)
+ throws IOException {
+ this.timeout = conf.getLong("ipc.client.timeout", 60000);
+
+ List<SegmentBean> beanList = new ArrayList<SegmentBean>();
+
+ List<InetSocketAddress> segmentServers =
+ NutchBean.readAddresses(serversConfig, conf);
+
+ for (InetSocketAddress addr : segmentServers) {
+ SegmentBean bean = (RPCSegmentBean) RPC.getProxy(RPCSegmentBean.class,
+ FetchedSegments.VERSION, addr, conf);
+ beanList.add(bean);
+ }
+
+ beans = beanList.toArray(new SegmentBean[beanList.size()]);
+
+ summaryTasks = new ArrayList<Callable<Summary[]>>(beans.length);
+ segmentWorkers = new ArrayList<SegmentWorker>(beans.length);
+
+ for (int i = 0; i < beans.length; i++) {
+ summaryTasks.add(new DistSummmaryTask(i));
+ segmentWorkers.add(new SegmentWorker(i));
+ }
+
+ segmentMap = new ConcurrentHashMap<String, Integer>();
+
+ pingService = Executors.newScheduledThreadPool(beans.length);
+ for (SegmentWorker worker : segmentWorkers) {
+ pingService.scheduleAtFixedRate(worker, 0, 30, TimeUnit.SECONDS);
+ }
+ }
+
+ private SegmentBean getBean(HitDetails details) {
+ return beans[segmentMap.get(details.getValue("segment"))];
+ }
+
+ public String[] getSegmentNames() {
+ return segmentMap.keySet().toArray(new String[segmentMap.size()]);
+ }
+
+ public byte[] getContent(HitDetails details) throws IOException {
+ return getBean(details).getContent(details);
+ }
+
+ public long getFetchDate(HitDetails details) throws IOException {
+ return getBean(details).getFetchDate(details);
+ }
+
+ public ParseData getParseData(HitDetails details) throws IOException {
+ return getBean(details).getParseData(details);
+ }
+
+ public ParseText getParseText(HitDetails details) throws IOException {
+ return getBean(details).getParseText(details);
+ }
+
+ public void close() throws IOException {
+ executor.shutdown();
+ pingService.shutdown();
+ for (SegmentBean bean : beans) {
+ bean.close();
+ }
+ }
+
+ public Summary getSummary(HitDetails details, Query query)
+ throws IOException {
+ return getBean(details).getSummary(details, query);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Summary[] getSummary(HitDetails[] detailsArr, Query query)
+ throws IOException {
+ List<HitDetails>[] detailsList = new ArrayList[summaryTasks.size()];
+ for (int i = 0; i < detailsList.length; i++) {
+ detailsList[i] = new ArrayList<HitDetails>();
+ }
+ for (HitDetails details : detailsArr) {
+ detailsList[segmentMap.get(details.getValue("segment"))].add(details);
+ }
+ for (int i = 0; i < summaryTasks.size(); i++) {
+ DistSummmaryTask task = (DistSummmaryTask)summaryTasks.get(i);
+ if (detailsList[i].size() > 0) {
+ HitDetails[] taskDetails =
+ detailsList[i].toArray(new HitDetails[detailsList[i].size()]);
+ task.setSummaryArgs(taskDetails, query);
+ } else {
+ task.setSummaryArgs(null, null);
+ }
+ }
+
+ List<Future<Summary[]>> summaries;
+ try {
+ summaries =
+ executor.invokeAll(summaryTasks, timeout, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<Summary> summaryList = new ArrayList<Summary>();
+ for (Future<Summary[]> f : summaries) {
+ Summary[] summaryArray;
+ try {
+ summaryArray = f.get();
+ if (summaryArray == null) {
+ continue;
+ }
+ for (Summary summary : summaryArray) {
+ summaryList.add(summary);
+ }
+ } catch (Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }
+
+ return summaryList.toArray(new Summary[summaryList.size()]);
+ }
+
+}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/FetchedSegments.java Mon Jan 12 05:26:16 2009
@@ -19,8 +19,16 @@
import java.io.IOException;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.*;
@@ -35,20 +43,89 @@
/** Implements {@link HitSummarizer} and {@link HitContent} for a set of
* fetched segments. */
-public class FetchedSegments implements HitSummarizer, HitContent {
+public class FetchedSegments implements RPCSegmentBean {
- private static class Segment implements Closeable {
-
- private static final Partitioner PARTITIONER = new HashPartitioner();
+ public static final long VERSION = 1L;
- private FileSystem fs;
- private Path segmentDir;
+ private static final ExecutorService executor =
+ Executors.newCachedThreadPool();
+
+ private class SummaryTask implements Callable<Summary> {
+ private final HitDetails details;
+ private final Query query;
+
+ public SummaryTask(HitDetails details, Query query) {
+ this.details = details;
+ this.query = query;
+ }
+
+ public Summary call() throws Exception {
+ return getSummary(details, query);
+ }
+ }
+
+ private class SegmentUpdater extends Thread {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ final FileStatus[] fstats = fs.listStatus(segmentsDir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ final Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
+ final Iterator<Map.Entry<String, Segment>> i =
+ segments.entrySet().iterator();
+ while (i.hasNext()) {
+ final Map.Entry<String, Segment> entry = i.next();
+ final Segment seg = entry.getValue();
+ if (!fs.exists(seg.segmentDir)) {
+ try {
+ seg.close();
+ } catch (final Exception e) {
+ /* A segment may fail to close
+ * since it may already be deleted from
+ * file system. So we just ignore the
+ * exception and remove the mapping from
+ * 'segments'.
+ */
+ } finally {
+ i.remove();
+ }
+ }
+ }
+
+ if (segmentDirs != null) {
+ for (final Path segmentDir : segmentDirs) {
+ segments.putIfAbsent(segmentDir.getName(),
+ new Segment(fs, segmentDir, conf));
+ }
+ }
+
+ Thread.sleep(60000);
+ } catch (final InterruptedException e) {
+ // ignore
+ } catch (final IOException e) {
+ // ignore
+ }
+ }
+ }
+
+ }
+
+
+ private static class Segment implements java.io.Closeable {
+
+ private static final Partitioner<Text, Writable> PARTITIONER =
+ new HashPartitioner<Text, Writable>();
+
+ private final FileSystem fs;
+ private final Path segmentDir;
private MapFile.Reader[] content;
private MapFile.Reader[] parseText;
private MapFile.Reader[] parseData;
private MapFile.Reader[] crawl;
- private Configuration conf;
+ private final Configuration conf;
public Segment(FileSystem fs, Path segmentDir, Configuration conf) throws IOException {
this.fs = fs;
@@ -63,7 +140,7 @@
}
return (CrawlDatum)getEntry(crawl, url, new CrawlDatum());
}
-
+
public byte[] getContent(Text url) throws IOException {
synchronized (this) {
if (content == null)
@@ -87,7 +164,7 @@
}
return (ParseText)getEntry(parseText, url, new ParseText());
}
-
+
private MapFile.Reader[] getReaders(String subDir) throws IOException {
return MapFileOutputFormat.getReaders(fs, new Path(segmentDir, subDir), this.conf);
}
@@ -112,31 +189,37 @@
}
- private HashMap segments = new HashMap();
- private Summarizer summarizer;
+ private final ConcurrentMap<String, Segment> segments =
+ new ConcurrentHashMap<String, Segment>();
+ private final FileSystem fs;
+ private final Configuration conf;
+ private final Path segmentsDir;
+ private final SegmentUpdater segUpdater;
+ private final Summarizer summarizer;
/** Construct given a directory containing fetcher output. */
- public FetchedSegments(FileSystem fs, String segmentsDir, Configuration conf) throws IOException {
- FileStatus[] fstats = fs.listStatus(new Path(segmentsDir),
+ public FetchedSegments(Configuration conf, Path segmentsDir)
+ throws IOException {
+ this.conf = conf;
+ this.fs = FileSystem.get(this.conf);
+ final FileStatus[] fstats = fs.listStatus(segmentsDir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
- Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
- this.summarizer = new SummarizerFactory(conf).getSummarizer();
+ final Path[] segmentDirs = HadoopFSUtil.getPaths(fstats);
+ this.summarizer = new SummarizerFactory(this.conf).getSummarizer();
+ this.segmentsDir = segmentsDir;
+ this.segUpdater = new SegmentUpdater();
if (segmentDirs != null) {
- for (int i = 0; i < segmentDirs.length; i++) {
- Path segmentDir = segmentDirs[i];
-// Path indexdone = new Path(segmentDir, IndexSegment.DONE_NAME);
-// if (fs.exists(indexdone) && fs.isFile(indexdone)) {
-// segments.put(segmentDir.getName(), new Segment(fs, segmentDir));
-// }
- segments.put(segmentDir.getName(), new Segment(fs, segmentDir, conf));
-
- }
+ for (final Path segmentDir : segmentDirs) {
+ segments.put(segmentDir.getName(),
+ new Segment(this.fs, segmentDir, this.conf));
+ }
}
+ this.segUpdater.start();
}
public String[] getSegmentNames() {
- return (String[])segments.keySet().toArray(new String[segments.size()]);
+ return segments.keySet().toArray(new String[segments.size()]);
}
public byte[] getContent(HitDetails details) throws IOException {
@@ -158,67 +241,57 @@
public Summary getSummary(HitDetails details, Query query)
throws IOException {
-
- if (this.summarizer == null) { return new Summary(); }
-
- Segment segment = getSegment(details);
- ParseText parseText = segment.getParseText(getUrl(details));
- String text = (parseText != null) ? parseText.getText() : "";
-
- return this.summarizer.getSummary(text, query);
- }
-
- private class SummaryThread extends Thread {
- private HitDetails details;
- private Query query;
-
- private Summary summary;
- private Throwable throwable;
- public SummaryThread(HitDetails details, Query query) {
- this.details = details;
- this.query = query;
- }
+ if (this.summarizer == null) { return new Summary(); }
- public void run() {
- try {
- this.summary = getSummary(details, query);
- } catch (Throwable throwable) {
- this.throwable = throwable;
- }
- }
+ final Segment segment = getSegment(details);
+ final ParseText parseText = segment.getParseText(getUrl(details));
+ final String text = (parseText != null) ? parseText.getText() : "";
+ return this.summarizer.getSummary(text, query);
}
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return VERSION;
+ }
public Summary[] getSummary(HitDetails[] details, Query query)
throws IOException {
- SummaryThread[] threads = new SummaryThread[details.length];
- for (int i = 0; i < threads.length; i++) {
- threads[i] = new SummaryThread(details[i], query);
- threads[i].start();
+ final List<Callable<Summary>> tasks =
+ new ArrayList<Callable<Summary>>(details.length);
+ for (int i = 0; i < details.length; i++) {
+ tasks.add(new SummaryTask(details[i], query));
}
- Summary[] results = new Summary[details.length];
- for (int i = 0; i < threads.length; i++) {
+ List<Future<Summary>> summaries;
+ try {
+ summaries = executor.invokeAll(tasks);
+ } catch (final InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+
+
+ final Summary[] results = new Summary[details.length];
+ for (int i = 0; i < details.length; i++) {
+ final Future<Summary> f = summaries.get(i);
+ Summary summary;
try {
- threads[i].join();
- } catch (InterruptedException e) {
+ summary = f.get();
+ } catch (final Exception e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
throw new RuntimeException(e);
}
- if (threads[i].throwable instanceof IOException) {
- throw (IOException)threads[i].throwable;
- } else if (threads[i].throwable != null) {
- throw new RuntimeException(threads[i].throwable);
- }
- results[i] = threads[i].summary;
+ results[i] = summary;
}
return results;
}
private Segment getSegment(HitDetails details) {
- return (Segment)segments.get(details.getValue("segment"));
+ return segments.get(details.getValue("segment"));
}
private Text getUrl(HitDetails details) {
@@ -230,10 +303,10 @@
}
public void close() throws IOException {
- Iterator iterator = segments.values().iterator();
+ final Iterator<Segment> iterator = segments.values().iterator();
while (iterator.hasNext()) {
- ((Segment) iterator.next()).close();
+ iterator.next().close();
}
}
-
+
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hit.java Mon Jan 12 05:26:16 2009
@@ -21,31 +21,33 @@
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
/** A document which matched a query in an index. */
-public class Hit implements Writable, Comparable {
+@SuppressWarnings("unchecked")
+public class Hit implements Writable, Comparable<Hit> {
private int indexNo; // index id
- private int indexDocNo; // index-relative id
+ private String uniqueKey;
private WritableComparable sortValue; // value sorted on
private String dedupValue; // value to dedup on
private boolean moreFromDupExcluded;
public Hit() {}
- public Hit(int indexNo, int indexDocNo) {
- this(indexNo, indexDocNo, null, null);
+ public Hit(int indexNo, String uniqueKey) {
+ this(indexNo, uniqueKey, null, null);
}
- public Hit(int indexNo, int indexDocNo,
- WritableComparable sortValue,
+ public Hit(int indexNo, String uniqueKey,
+ WritableComparable sortValue,
String dedupValue) {
- this(indexDocNo, sortValue, dedupValue);
+ this(uniqueKey, sortValue, dedupValue);
this.indexNo = indexNo;
}
- public Hit(int indexDocNo, WritableComparable sortValue, String dedupValue) {
- this.indexDocNo = indexDocNo;
+ public Hit(String uniqueKey, WritableComparable sortValue, String dedupValue) {
+ this.uniqueKey = uniqueKey;
this.sortValue = sortValue;
this.dedupValue = dedupValue == null ? "" : dedupValue;
}
@@ -54,8 +56,8 @@
public int getIndexNo() { return indexNo; }
public void setIndexNo(int indexNo) { this.indexNo = indexNo; }
- /** Return the document number of this hit within an index. */
- public int getIndexDocNo() { return indexDocNo; }
+ /** Return the unique identifier of this hit within an index. */
+ public String getUniqueKey() { return uniqueKey; }
/** Return the value of the field that hits are sorted on. */
public WritableComparable getSortValue() { return sortValue; }
@@ -73,39 +75,26 @@
/** Display as a string. */
public String toString() {
- return "#" + indexDocNo;
+ return "#" + uniqueKey;
}
- public boolean equals(Object o) {
- if (!(o instanceof Hit))
- return false;
- Hit other = (Hit)o;
- return this.indexNo == other.indexNo
- && this.indexDocNo == other.indexDocNo;
- }
-
- public int hashCode() {
- return indexNo ^ indexDocNo;
- }
-
- public int compareTo(Object o) {
- Hit other = (Hit)o;
+ public int compareTo(Hit other) {
int compare = sortValue.compareTo(other.sortValue);
if (compare != 0) {
return compare; // use sortValue
} else if (other.indexNo != this.indexNo) {
return other.indexNo - this.indexNo; // prefer later indexes
} else {
- return other.indexDocNo - this.indexDocNo; // prefer later docs
+ return other.uniqueKey.compareTo(this.uniqueKey); // prefer later doc
}
}
public void write(DataOutput out) throws IOException {
- out.writeInt(indexDocNo);
+ Text.writeString(out, uniqueKey);
}
public void readFields(DataInput in) throws IOException {
- indexDocNo = in.readInt();
+ uniqueKey = Text.readString(in);
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/HitDetails.java Mon Jan 12 05:26:16 2009
@@ -73,14 +73,14 @@
/** Returns all the values with the specified name. */
public String[] getValues(String field) {
- ArrayList vals = new ArrayList();
+ ArrayList<String> vals = new ArrayList<String>();
for (int i=0; i<length; i++) {
if (fields[i].equals(field)) {
vals.add(values[i]);
}
}
return (vals.size() > 0)
- ? (String[]) vals.toArray(new String[vals.size()])
+ ? vals.toArray(new String[vals.size()])
: null;
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Hits.java Mon Jan 12 05:26:16 2009
@@ -65,7 +65,6 @@
return results;
}
-
public void write(DataOutput out) throws IOException {
out.writeLong(total); // write total hits
out.writeInt(top.length); // write hits returned
@@ -74,12 +73,13 @@
for (int i = 0; i < top.length; i++) {
Hit h = top[i];
- out.writeInt(h.getIndexDocNo()); // write indexDocNo
+ Text.writeString(out, h.getUniqueKey()); // write uniqueKey
h.getSortValue().write(out); // write sortValue
Text.writeString(out, h.getDedupValue()); // write dedupValue
}
}
+ @SuppressWarnings("unchecked")
public void readFields(DataInput in) throws IOException {
total = in.readLong(); // read total hits
top = new Hit[in.readInt()]; // read hits returned
@@ -93,7 +93,7 @@
}
for (int i = 0; i < top.length; i++) {
- int indexDocNo = in.readInt(); // read indexDocNo
+ String uniqueKey = Text.readString(in); // read uniqueKey
WritableComparable sortValue = null;
try {
@@ -105,7 +105,7 @@
String dedupValue = Text.readString(in); // read dedupValue
- top[i] = new Hit(indexDocNo, sortValue, dedupValue);
+ top[i] = new Hit(uniqueKey, sortValue, dedupValue);
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/IndexSearcher.java Mon Jan 12 05:26:16 2009
@@ -103,12 +103,12 @@
public String getExplanation(Query query, Hit hit) throws IOException {
return luceneSearcher.explain(this.queryFilters.filter(query),
- hit.getIndexDocNo()).toHtml();
+ Integer.valueOf(hit.getUniqueKey())).toHtml();
}
public HitDetails getDetails(Hit hit) throws IOException {
- Document doc = luceneSearcher.doc(hit.getIndexDocNo());
+ Document doc = luceneSearcher.doc(Integer.valueOf(hit.getUniqueKey()));
List docFields = doc.getFields();
String[] fields = new String[docFields.size()];
@@ -162,7 +162,7 @@
String dedupValue = dedupValues == null ? null : dedupValues[doc];
- hits[i] = new Hit(doc, sortValue, dedupValue);
+ hits[i] = new Hit(Integer.toString(doc), sortValue, dedupValue);
}
return new Hits(topDocs.totalHits, hits);
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneQueryOptimizer.java Mon Jan 12 05:26:16 2009
@@ -18,7 +18,6 @@
package org.apache.nutch.searcher;
import org.apache.lucene.search.Searcher;
-import org.apache.lucene.search.QueryFilter;
import org.apache.lucene.search.*;
import org.apache.lucene.index.Term;
import org.apache.lucene.misc.ChainedFilter;
@@ -83,6 +82,7 @@
}
+ @SuppressWarnings("serial")
private static class TimeExceeded extends RuntimeException {
public long maxTime;
private int maxDoc;
@@ -127,7 +127,8 @@
}
}
- private static class LimitExceeded extends RuntimeException {
+ @SuppressWarnings("serial")
+private static class LimitExceeded extends RuntimeException {
private int maxDoc;
public LimitExceeded(int maxDoc) { this.maxDoc = maxDoc; }
}
@@ -151,13 +152,14 @@
* @param threshold
* the fraction of documents which must contain a term
*/
- public LuceneQueryOptimizer(Configuration conf) {
+ @SuppressWarnings("serial")
+public LuceneQueryOptimizer(Configuration conf) {
final int cacheSize = conf.getInt("searcher.filter.cache.size", 16);
this.threshold = conf.getFloat("searcher.filter.cache.threshold",
0.05f);
this.searcherMaxHits = conf.getInt("searcher.max.hits", -1);
this.cache = new LinkedHashMap<BooleanQuery, Filter>(cacheSize, 0.75f, true) {
- protected boolean removeEldestEntry(Map.Entry eldest) {
+ protected boolean removeEldestEntry(Map.Entry<BooleanQuery, Filter> eldest) {
return size() > cacheSize; // limit size of cache
}
};
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/LuceneSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nutch.indexer.Indexer;
+import org.apache.nutch.util.HadoopFSUtil;
+
+public class LuceneSearchBean implements RPCSearchBean {
+
+ public static final long VERSION = 1L;
+
+ private IndexSearcher searcher;
+
+ private FileSystem fs;
+
+ private Configuration conf;
+
+ /**
+ * Construct in a named directory.
+ * @param conf
+ * @param dir
+ * @throws IOException
+ */
+ public LuceneSearchBean(Configuration conf, Path indexDir, Path indexesDir)
+ throws IOException {
+ this.conf = conf;
+ this.fs = FileSystem.get(this.conf);
+ init(indexDir, indexesDir);
+ }
+
+ private void init(Path indexDir, Path indexesDir)
+ throws IOException {
+ if (this.fs.exists(indexDir)) {
+ LOG.info("opening merged index in " + indexDir);
+ this.searcher = new IndexSearcher(indexDir, this.conf);
+ } else {
+ LOG.info("opening indexes in " + indexesDir);
+
+ List<Path> vDirs = new ArrayList<Path>();
+ FileStatus[] fstats = fs.listStatus(indexesDir, HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] directories = HadoopFSUtil.getPaths(fstats);
+ for(int i = 0; i < directories.length; i++) {
+ Path indexdone = new Path(directories[i], Indexer.DONE_NAME);
+ if(fs.isFile(indexdone)) {
+ vDirs.add(directories[i]);
+ }
+ }
+
+ directories = new Path[ vDirs.size() ];
+ for(int i = 0; vDirs.size()>0; i++) {
+ directories[i] = vDirs.remove(0);
+ }
+
+ this.searcher = new IndexSearcher(directories, this.conf);
+ }
+ }
+
+ public Hits search(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse)
+ throws IOException {
+ return searcher.search(query, numHits, dedupField, sortField, reverse);
+ }
+
+ public String getExplanation(Query query, Hit hit) throws IOException {
+ return searcher.getExplanation(query, hit);
+ }
+
+ public HitDetails getDetails(Hit hit) throws IOException {
+ return searcher.getDetails(hit);
+ }
+
+ public HitDetails[] getDetails(Hit[] hits) throws IOException {
+ return searcher.getDetails(hits);
+ }
+
+ public boolean ping() throws IOException {
+ return true;
+ }
+
+ public void close() throws IOException {
+ if (searcher != null) { searcher.close(); }
+ if (fs != null) { fs.close(); }
+ }
+
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return VERSION;
+ }
+
+}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/NutchBean.java Mon Jan 12 05:26:16 2009
@@ -18,29 +18,28 @@
package org.apache.nutch.searcher;
import java.io.*;
+import java.net.InetSocketAddress;
import java.util.*;
+
import javax.servlet.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.Closeable;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.StringUtils;
import org.apache.nutch.parse.*;
-import org.apache.nutch.indexer.*;
import org.apache.nutch.crawl.Inlinks;
-import org.apache.nutch.util.HadoopFSUtil;
import org.apache.nutch.util.NutchConfiguration;
-/**
+/**
* One stop shopping for search-related functionality.
* @version $Id: NutchBean.java,v 1.19 2005/02/07 19:10:08 cutting Exp $
- */
+ */
public class NutchBean
- implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks,
- DistributedSearch.Protocol, Closeable {
+implements SearchBean, SegmentBean, HitInlinks, Closeable {
public static final Log LOG = LogFactory.getLog(NutchBean.class);
public static final String KEY = "nutchBean";
@@ -51,122 +50,105 @@
private String[] segmentNames;
- private Searcher searcher;
- private HitDetailer detailer;
- private HitSummarizer summarizer;
- private HitContent content;
- private HitInlinks linkDb;
+ private SearchBean searchBean;
+ private SegmentBean segmentBean;
+ private final HitInlinks linkDb;
/** BooleanQuery won't permit more than 32 required/prohibited clauses. We
- * don't want to use too many of those. */
+ * don't want to use too many of those. */
private static final int MAX_PROHIBITED_TERMS = 20;
-
- private Configuration conf;
- private FileSystem fs;
+ private final Configuration conf;
+
+ private final FileSystem fs;
- /** Returns the cached instance in the servlet context.
+ /** Returns the cached instance in the servlet context.
* @see NutchBeanConstructor*/
public static NutchBean get(ServletContext app, Configuration conf) throws IOException {
- NutchBean bean = (NutchBean)app.getAttribute(KEY);
+ final NutchBean bean = (NutchBean)app.getAttribute(KEY);
return bean;
}
/**
- *
+ *
* @param conf
* @throws IOException
*/
public NutchBean(Configuration conf) throws IOException {
this(conf, null);
}
-
+
/**
- * Construct in a named directory.
+ * Construct in a named directory.
+ *
* @param conf
* @param dir
* @throws IOException
*/
public NutchBean(Configuration conf, Path dir) throws IOException {
- this.conf = conf;
- this.fs = FileSystem.get(this.conf);
- if (dir == null) {
- dir = new Path(this.conf.get("searcher.dir", "crawl"));
- }
- Path servers = new Path(dir, "search-servers.txt");
- if (fs.exists(servers)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("searching servers in " + servers);
- }
- init(new DistributedSearch.Client(servers, conf));
- } else {
- init(new Path(dir, "index"), new Path(dir, "indexes"), new Path(
- dir, "segments"), new Path(dir, "linkdb"));
- }
+ this.conf = conf;
+ this.fs = FileSystem.get(this.conf);
+ if (dir == null) {
+ dir = new Path(this.conf.get("searcher.dir", "crawl"));
}
+ final Path luceneConfig = new Path(dir, "search-servers.txt");
+ final Path solrConfig = new Path(dir, "solr-servers.txt");
+ final Path segmentConfig = new Path(dir, "segment-servers.txt");
- private void init(Path indexDir, Path indexesDir, Path segmentsDir,
- Path linkDb)
- throws IOException {
- IndexSearcher indexSearcher;
- if (this.fs.exists(indexDir)) {
- if (LOG.isInfoEnabled()) {
- LOG.info("opening merged index in " + indexDir);
- }
- indexSearcher = new IndexSearcher(indexDir, this.conf);
+ if (fs.exists(luceneConfig) || fs.exists(solrConfig)) {
+ searchBean = new DistributedSearchBean(conf, luceneConfig, solrConfig);
} else {
- if (LOG.isInfoEnabled()) {
- LOG.info("opening indexes in " + indexesDir);
- }
-
- Vector vDirs=new Vector();
- FileStatus[] fstats = fs.listStatus(indexesDir,
- HadoopFSUtil.getPassDirectoriesFilter(fs));
- Path [] directories = HadoopFSUtil.getPaths(fstats);
- for(int i = 0; i < directories.length; i++) {
- Path indexdone = new Path(directories[i], Indexer.DONE_NAME);
- if(fs.isFile(indexdone)) {
- vDirs.add(directories[i]);
+ final Path indexDir = new Path(dir, "index");
+ final Path indexesDir = new Path(dir, "indexes");
+ searchBean = new LuceneSearchBean(conf, indexDir, indexesDir);
+ }
+
+ if (fs.exists(segmentConfig)) {
+ segmentBean = new DistributedSegmentBean(conf, segmentConfig);
+ } else if (fs.exists(luceneConfig)) {
+ segmentBean = new DistributedSegmentBean(conf, luceneConfig);
+ } else {
+ segmentBean = new FetchedSegments(conf, new Path(dir, "segments"));
+ }
+
+ linkDb = new LinkDbInlinks(fs, new Path(dir, "linkdb"), conf);
+ }
+
+ public static List<InetSocketAddress> readAddresses(Path path,
+ Configuration conf) throws IOException {
+ final List<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
+ for (final String line : readConfig(path, conf)) {
+ final StringTokenizer tokens = new StringTokenizer(line);
+ if (tokens.hasMoreTokens()) {
+ final String host = tokens.nextToken();
+ if (tokens.hasMoreTokens()) {
+ final String port = tokens.nextToken();
+ addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
}
}
-
-
- directories = new Path[ vDirs.size() ];
- for(int i = 0; vDirs.size()>0; i++) {
- directories[i]=(Path)vDirs.remove(0);
- }
-
- indexSearcher = new IndexSearcher(directories, this.conf);
}
+ return addrs;
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info("opening segments in " + segmentsDir);
+ public static List<String> readConfig(Path path, Configuration conf)
+ throws IOException {
+ final FileSystem fs = FileSystem.get(conf);
+ final BufferedReader reader =
+ new BufferedReader(new InputStreamReader(fs.open(path)));
+ try {
+ final ArrayList<String> addrs = new ArrayList<String>();
+ String line;
+ while ((line = reader.readLine()) != null) {
+ addrs.add(line);
+ }
+ return addrs;
+ } finally {
+ reader.close();
}
- FetchedSegments segments = new FetchedSegments(this.fs, segmentsDir.toString(),this.conf);
-
- this.segmentNames = segments.getSegmentNames();
-
- this.searcher = indexSearcher;
- this.detailer = indexSearcher;
- this.summarizer = segments;
- this.content = segments;
-
- if (LOG.isInfoEnabled()) { LOG.info("opening linkdb in " + linkDb); }
- this.linkDb = new LinkDbInlinks(fs, linkDb, this.conf);
- }
-
- private void init(DistributedSearch.Client client) {
- this.segmentNames = client.getSegmentNames();
- this.searcher = client;
- this.detailer = client;
- this.summarizer = client;
- this.content = client;
- this.linkDb = client;
}
-
public String[] getSegmentNames() {
return segmentNames;
}
@@ -174,15 +156,16 @@
public Hits search(Query query, int numHits) throws IOException {
return search(query, numHits, null, null, false);
}
-
+
public Hits search(Query query, int numHits,
String dedupField, String sortField, boolean reverse)
throws IOException {
- return searcher.search(query, numHits, dedupField, sortField, reverse);
+ return searchBean.search(query, numHits, dedupField, sortField, reverse);
}
-
- private class DupHits extends ArrayList {
+
+ @SuppressWarnings("serial")
+ private class DupHits extends ArrayList<Hit> {
private boolean maxSizeExceeded;
}
@@ -191,7 +174,7 @@
* site are removed from results. The remaining hits have {@link
* Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero then all
* hits are returned.
- *
+ *
* @param query query
* @param numHits number of requested hits
* @param maxHitsPerDup the maximum hits returned with matching values, or zero
@@ -208,7 +191,7 @@
* <code>maxHitsPerDup</code> are removed from results. The remaining hits
* have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero
* then all hits are returned.
- *
+ *
* @param query query
* @param numHits number of requested hits
* @param maxHitsPerDup the maximum hits returned with matching values, or zero
@@ -226,7 +209,7 @@
* <code>maxHitsPerDup</code> are removed from results. The remaining hits
* have {@link Hit#moreFromDupExcluded()} set. <p> If maxHitsPerDup is zero
* then all hits are returned.
- *
+ *
* @param query query
* @param numHits number of requested hits
* @param maxHitsPerDup the maximum hits returned with matching values, or zero
@@ -243,35 +226,35 @@
if (maxHitsPerDup <= 0) // disable dup checking
return search(query, numHits, dedupField, sortField, reverse);
- float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f);
+ final float rawHitsFactor = this.conf.getFloat("searcher.hostgrouping.rawhits.factor", 2.0f);
int numHitsRaw = (int)(numHits * rawHitsFactor);
if (LOG.isInfoEnabled()) {
LOG.info("searching for "+numHitsRaw+" raw hits");
}
- Hits hits = searcher.search(query, numHitsRaw,
+ Hits hits = searchBean.search(query, numHitsRaw,
dedupField, sortField, reverse);
- long total = hits.getTotal();
- Map dupToHits = new HashMap();
- List resultList = new ArrayList();
- Set seen = new HashSet();
- List excludedValues = new ArrayList();
+ final long total = hits.getTotal();
+ final Map<String, DupHits> dupToHits = new HashMap<String, DupHits>();
+ final List<Hit> resultList = new ArrayList<Hit>();
+ final Set<Hit> seen = new HashSet<Hit>();
+ final List<String> excludedValues = new ArrayList<String>();
boolean totalIsExact = true;
for (int rawHitNum = 0; rawHitNum < hits.getTotal(); rawHitNum++) {
// get the next raw hit
if (rawHitNum >= hits.getLength()) {
// optimize query by prohibiting more matches on some excluded values
- Query optQuery = (Query)query.clone();
+ final Query optQuery = (Query)query.clone();
for (int i = 0; i < excludedValues.size(); i++) {
if (i == MAX_PROHIBITED_TERMS)
break;
- optQuery.addProhibitedTerm(((String)excludedValues.get(i)),
+ optQuery.addProhibitedTerm(excludedValues.get(i),
dedupField);
}
numHitsRaw = (int)(numHitsRaw * rawHitsFactor);
if (LOG.isInfoEnabled()) {
LOG.info("re-searching for "+numHitsRaw+" raw hits, query: "+optQuery);
}
- hits = searcher.search(optQuery, numHitsRaw,
+ hits = searchBean.search(optQuery, numHitsRaw,
dedupField, sortField, reverse);
if (LOG.isInfoEnabled()) {
LOG.info("found "+hits.getTotal()+" raw hits");
@@ -280,14 +263,14 @@
continue;
}
- Hit hit = hits.getHit(rawHitNum);
+ final Hit hit = hits.getHit(rawHitNum);
if (seen.contains(hit))
continue;
seen.add(hit);
-
+
// get dup hits for its value
- String value = hit.getDedupValue();
- DupHits dupHits = (DupHits)dupToHits.get(value);
+ final String value = hit.getDedupValue();
+ DupHits dupHits = dupToHits.get(value);
if (dupHits == null)
dupToHits.put(value, dupHits = new DupHits());
@@ -297,7 +280,7 @@
// mark prior hits with moreFromDupExcluded
for (int i = 0; i < dupHits.size(); i++) {
- ((Hit)dupHits.get(i)).setMoreFromDupExcluded(true);
+ dupHits.get(i).setMoreFromDupExcluded(true);
}
dupHits.maxSizeExceeded = true;
@@ -316,45 +299,45 @@
}
}
- Hits results =
+ final Hits results =
new Hits(total,
- (Hit[])resultList.toArray(new Hit[resultList.size()]));
+ resultList.toArray(new Hit[resultList.size()]));
results.setTotalIsExact(totalIsExact);
return results;
}
-
+
public String getExplanation(Query query, Hit hit) throws IOException {
- return searcher.getExplanation(query, hit);
+ return searchBean.getExplanation(query, hit);
}
public HitDetails getDetails(Hit hit) throws IOException {
- return detailer.getDetails(hit);
+ return searchBean.getDetails(hit);
}
public HitDetails[] getDetails(Hit[] hits) throws IOException {
- return detailer.getDetails(hits);
+ return searchBean.getDetails(hits);
}
public Summary getSummary(HitDetails hit, Query query) throws IOException {
- return summarizer.getSummary(hit, query);
+ return segmentBean.getSummary(hit, query);
}
public Summary[] getSummary(HitDetails[] hits, Query query)
throws IOException {
- return summarizer.getSummary(hits, query);
+ return segmentBean.getSummary(hits, query);
}
public byte[] getContent(HitDetails hit) throws IOException {
- return content.getContent(hit);
+ return segmentBean.getContent(hit);
}
public ParseData getParseData(HitDetails hit) throws IOException {
- return content.getParseData(hit);
+ return segmentBean.getParseData(hit);
}
public ParseText getParseText(HitDetails hit) throws IOException {
- return content.getParseText(hit);
+ return segmentBean.getParseText(hit);
}
public String[] getAnchors(HitDetails hit) throws IOException {
@@ -366,67 +349,80 @@
}
public long getFetchDate(HitDetails hit) throws IOException {
- return content.getFetchDate(hit);
+ return segmentBean.getFetchDate(hit);
}
public void close() throws IOException {
- if (content != null) { content.close(); }
- if (searcher != null) { searcher.close(); }
+ if (searchBean != null) { searchBean.close(); }
+ if (segmentBean != null) { segmentBean.close(); }
if (linkDb != null) { linkDb.close(); }
if (fs != null) { fs.close(); }
}
-
+
+ public boolean ping() {
+ return true;
+ }
+
/** For debugging. */
public static void main(String[] args) throws Exception {
- String usage = "NutchBean query";
+ final String usage = "NutchBean query";
if (args.length == 0) {
System.err.println(usage);
System.exit(-1);
}
- Configuration conf = NutchConfiguration.create();
- NutchBean bean = new NutchBean(conf);
- Query query = Query.parse(args[0], conf);
- Hits hits = bean.search(query, 10);
+ final Configuration conf = NutchConfiguration.create();
+ final NutchBean bean = new NutchBean(conf);
+ final Query query = Query.parse(args[0], conf);
+ final Hits hits = bean.search(query, 10);
System.out.println("Total hits: " + hits.getTotal());
- int length = (int)Math.min(hits.getTotal(), 10);
- Hit[] show = hits.getHits(0, length);
- HitDetails[] details = bean.getDetails(show);
- Summary[] summaries = bean.getSummary(details, query);
+ final int length = (int)Math.min(hits.getTotal(), 10);
+ final Hit[] show = hits.getHits(0, length);
+ final HitDetails[] details = bean.getDetails(show);
+ final Summary[] summaries = bean.getSummary(details, query);
for (int i = 0; i < hits.getLength(); i++) {
- System.out.println(" "+i+" "+ details[i] + "\n" + summaries[i]);
+ System.out.println(" " + i + " " + details[i] + "\n" + summaries[i]);
}
}
- public long getProtocolVersion(String className, long arg1) throws IOException {
- if(DistributedSearch.Protocol.class.getName().equals(className)){
- return 1;
+ public long getProtocolVersion(String className, long clientVersion)
+ throws IOException {
+ if(RPCSearchBean.class.getName().equals(className) &&
+ searchBean instanceof RPCSearchBean) {
+
+ final RPCSearchBean rpcBean = (RPCSearchBean)searchBean;
+ return rpcBean.getProtocolVersion(className, clientVersion);
+ } else if (SegmentBean.class.getName().equals(className) &&
+ segmentBean instanceof RPCSegmentBean) {
+
+ final RPCSegmentBean rpcBean = (RPCSegmentBean)segmentBean;
+ return rpcBean.getProtocolVersion(className, clientVersion);
} else {
throw new IOException("Unknown Protocol classname:" + className);
}
}
- /** Responsible for constructing a NutchBean singleton instance and
- * caching it in the servlet context. This class should be registered in
- * the deployment descriptor as a listener
+ /** Responsible for constructing a NutchBean singleton instance and
+ * caching it in the servlet context. This class should be registered in
+ * the deployment descriptor as a listener
*/
public static class NutchBeanConstructor implements ServletContextListener {
-
+
public void contextDestroyed(ServletContextEvent sce) { }
public void contextInitialized(ServletContextEvent sce) {
- ServletContext app = sce.getServletContext();
- Configuration conf = NutchConfiguration.get(app);
-
+ final ServletContext app = sce.getServletContext();
+ final Configuration conf = NutchConfiguration.get(app);
+
LOG.info("creating new bean");
NutchBean bean = null;
try {
bean = new NutchBean(conf);
app.setAttribute(KEY, bean);
}
- catch (IOException ex) {
+ catch (final IOException ex) {
LOG.error(StringUtils.stringifyException(ex));
}
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/OpenSearchServlet.java Mon Jan 12 05:26:16 2009
@@ -43,8 +43,10 @@
/** Present search results using A9's OpenSearch extensions to RSS, plus a few
* Nutch-specific extensions. */
+@SuppressWarnings("serial")
public class OpenSearchServlet extends HttpServlet {
- private static final Map NS_MAP = new HashMap();
+ private static final Map<String, String> NS_MAP =
+ new HashMap<String, String>();
private int MAX_HITS_PER_PAGE;
static {
@@ -52,7 +54,7 @@
NS_MAP.put("nutch", "http://www.nutch.org/opensearchrss/1.0/");
}
- private static final Set SKIP_DETAILS = new HashSet();
+ private static final Set<String> SKIP_DETAILS = new HashSet<String>();
static {
SKIP_DETAILS.add("url"); // redundant with RSS link
SKIP_DETAILS.add("title"); // redundant with RSS title
@@ -171,8 +173,8 @@
Element rss = addNode(doc, doc, "rss");
addAttribute(doc, rss, "version", "2.0");
addAttribute(doc, rss, "xmlns:opensearch",
- (String)NS_MAP.get("opensearch"));
- addAttribute(doc, rss, "xmlns:nutch", (String)NS_MAP.get("nutch"));
+ NS_MAP.get("opensearch"));
+ addAttribute(doc, rss, "xmlns:nutch", NS_MAP.get("nutch"));
Element channel = addNode(doc, rss, "channel");
@@ -214,7 +216,7 @@
HitDetails detail = details[i];
String title = detail.getValue("title");
String url = detail.getValue("url");
- String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getIndexDocNo();
+ String id = "idx=" + hit.getIndexNo() + "&id=" + hit.getUniqueKey();
if (title == null || title.equals("")) { // use url for docs w/o title
title = url;
@@ -283,7 +285,7 @@
private static void addNode(Document doc, Node parent,
String ns, String name, String text) {
- Element child = doc.createElementNS((String)NS_MAP.get(ns), ns+":"+name);
+ Element child = doc.createElementNS(NS_MAP.get(ns), ns+":"+name);
child.appendChild(doc.createTextNode(getLegalXml(text)));
parent.appendChild(child);
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Query.java Mon Jan 12 05:26:16 2009
@@ -282,7 +282,7 @@
}
- private ArrayList clauses = new ArrayList();
+ private ArrayList<Clause> clauses = new ArrayList<Clause>();
private Configuration conf;
@@ -305,7 +305,7 @@
/** Return all clauses. */
public Clause[] getClauses() {
- return (Clause[])clauses.toArray(CLAUSES_PROTO);
+ return clauses.toArray(CLAUSES_PROTO);
}
/** Add a required term in the default field. */
@@ -361,7 +361,7 @@
public void write(DataOutput out) throws IOException {
out.writeByte(clauses.size());
for (int i = 0; i < clauses.size(); i++)
- ((Clause)clauses.get(i)).write(out);
+ clauses.get(i).write(out);
}
public static Query read(DataInput in, Configuration conf) throws IOException {
@@ -404,7 +404,7 @@
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
- clone.clauses = (ArrayList)clauses.clone();
+ clone.clauses = (ArrayList<Clause>)clauses.clone();
return clone;
}
@@ -412,9 +412,9 @@
/** Flattens a query into the set of text terms that it contains. These are
* terms which should be higlighted in matching documents. */
public String[] getTerms() {
- ArrayList result = new ArrayList();
+ ArrayList<String> result = new ArrayList<String>();
for (int i = 0; i < clauses.size(); i++) {
- Clause clause = (Clause)clauses.get(i);
+ Clause clause = clauses.get(i);
if (!clause.isProhibited()) {
if (clause.isPhrase()) {
Term[] terms = clause.getPhrase().getTerms();
@@ -426,7 +426,7 @@
}
}
}
- return (String[])result.toArray(new String[result.size()]);
+ return result.toArray(new String[result.size()]);
}
/**
@@ -457,7 +457,7 @@
for (int i = 0; i < clauses.length; i++) {
Clause c = clauses[i];
if (!new QueryFilters(conf).isField(c.getField())) { // unknown field
- ArrayList terms = new ArrayList(); // add name to query
+ ArrayList<Term> terms = new ArrayList<Term>(); // add name to query
if (c.isPhrase()) {
terms.addAll(Arrays.asList(c.getPhrase().getTerms()));
} else {
@@ -467,7 +467,7 @@
c = (Clause)c.clone();
c.field = Clause.DEFAULT_FIELD; // use default field instead
c.termOrPhrase
- = new Phrase((Term[])terms.toArray(new Term[terms.size()]));
+ = new Phrase(terms.toArray(new Term[terms.size()]));
}
output.clauses.add(c); // copy clause to output
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryException.java Mon Jan 12 05:26:16 2009
@@ -17,6 +17,7 @@
package org.apache.nutch.searcher;
+@SuppressWarnings("serial")
public class QueryException extends java.io.IOException {
public QueryException(String message) {
super(message);
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/QueryFilters.java Mon Jan 12 05:26:16 2009
@@ -40,14 +40,14 @@
private static final Log LOG = LogFactory.getLog(QueryFilters.class);
private QueryFilter[] queryFilters;
- private HashSet FIELD_NAMES ;
- private HashSet RAW_FIELD_NAMES;
+ private HashSet<String> FIELD_NAMES ;
+ private HashSet<String> RAW_FIELD_NAMES;
- private static ArrayList parseFieldNames(Extension extension,
+ private static List<String> parseFieldNames(Extension extension,
String attribute) {
String fields = extension.getAttribute(attribute);
if (fields == null) fields = "";
- return Collections.list(new StringTokenizer(fields, " ,\t\n\r"));
+ return Arrays.asList(fields.split("[,\\s]"));
}
public QueryFilters(Configuration conf) {
@@ -61,13 +61,14 @@
if (point == null)
throw new RuntimeException(QueryFilter.X_POINT_ID + " not found.");
Extension[] extensions = point.getExtensions();
- FIELD_NAMES = new HashSet();
- RAW_FIELD_NAMES = new HashSet();
+ FIELD_NAMES = new HashSet<String>();
+ RAW_FIELD_NAMES = new HashSet<String>();
QueryFilter[] filters = new QueryFilter[extensions.length];
for (int i = 0; i < extensions.length; i++) {
Extension extension = extensions[i];
- ArrayList fieldNames = parseFieldNames(extension, "fields");
- ArrayList rawFieldNames = parseFieldNames(extension, "raw-fields");
+ List<String> fieldNames = parseFieldNames(extension, "fields");
+ List<String> rawFieldNames =
+ parseFieldNames(extension, "raw-fields");
if (fieldNames.size() == 0 && rawFieldNames.size() == 0) {
if (LOG.isWarnEnabled()) {
LOG.warn("QueryFilter: " + extension.getId()
@@ -90,8 +91,8 @@
.getName());
} else {
// cache already filled
- FIELD_NAMES = (HashSet) objectCache.getObject("FIELD_NAMES");
- RAW_FIELD_NAMES = (HashSet) objectCache.getObject("RAW_FIELD_NAMES");
+ FIELD_NAMES = (HashSet<String>) objectCache.getObject("FIELD_NAMES");
+ RAW_FIELD_NAMES = (HashSet<String>) objectCache.getObject("RAW_FIELD_NAMES");
}
}
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,7 @@
+package org.apache.nutch.searcher;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface RPCSearchBean extends SearchBean, VersionedProtocol {
+
+}
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/RPCSegmentBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,7 @@
+package org.apache.nutch.searcher;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+public interface RPCSegmentBean extends SegmentBean, VersionedProtocol {
+
+}
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,12 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public interface SearchBean extends Searcher, HitDetailer {
+ public static final Log LOG = LogFactory.getLog(SearchBean.class);
+
+ public boolean ping() throws IOException ;
+}
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SegmentBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,8 @@
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+
+public interface SegmentBean extends HitContent, HitSummarizer {
+
+ public String[] getSegmentNames() throws IOException;
+}
Added: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java?rev=733738&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/SolrSearchBean.java Mon Jan 12 05:26:16 2009
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nutch.searcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.BooleanClause;
+import org.apache.lucene.search.BooleanQuery;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.util.ToStringUtils;
+import org.apache.nutch.indexer.solr.SolrWriter;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServer;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrQuery.ORDER;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+
+public class SolrSearchBean implements SearchBean {
+
+ public static final Log LOG = LogFactory.getLog(SolrSearchBean.class);
+
+ private final SolrServer solr;
+
+ private final QueryFilters filters;
+
+ public SolrSearchBean(Configuration conf, String solrServer)
+ throws IOException {
+ solr = new CommonsHttpSolrServer(solrServer);
+ filters = new QueryFilters(conf);
+ }
+
+ public String getExplanation(Query query, Hit hit) throws IOException {
+ return "SOLR backend does not support explanations yet.";
+ }
+
+ @SuppressWarnings("unchecked")
+ public Hits search(Query query, int numHits, String dedupField,
+ String sortField, boolean reverse)
+ throws IOException {
+
+ // filter query string
+ final BooleanQuery bQuery = filters.filter(query);
+
+ final SolrQuery solrQuery = new SolrQuery(stringify(bQuery));
+
+ solrQuery.setRows(numHits);
+
+ if (sortField == null) {
+ solrQuery.setFields(dedupField, "score", "id");
+ sortField = "score";
+ } else {
+ solrQuery.setFields(dedupField, sortField, "id");
+ solrQuery.setSortField(sortField, reverse ? ORDER.asc : ORDER.desc);
+ }
+
+ QueryResponse response;
+ try {
+ response = solr.query(solrQuery);
+ } catch (final SolrServerException e) {
+ throw SolrWriter.makeIOException(e);
+ }
+
+ final SolrDocumentList docList = response.getResults();
+
+ final Hit[] hitArr = new Hit[docList.size()];
+ for (int i = 0; i < hitArr.length; i++) {
+ final SolrDocument solrDoc = docList.get(i);
+
+ final Object raw = solrDoc.getFirstValue(sortField);
+ WritableComparable sortValue;
+
+ if (raw instanceof Integer) {
+ sortValue = new IntWritable(((Integer)raw).intValue());
+ } else if (raw instanceof Float) {
+ sortValue = new FloatWritable(((Float)raw).floatValue());
+ } else if (raw instanceof String) {
+ sortValue = new Text((String)raw);
+ } else if (raw instanceof Long) {
+ sortValue = new LongWritable(((Long)raw).longValue());
+ } else {
+ throw new RuntimeException("Unknown sort value type!");
+ }
+
+ final String dedupValue = (String) solrDoc.getFirstValue(dedupField);
+
+ final String uniqueKey = (String )solrDoc.getFirstValue("id");
+
+ hitArr[i] = new Hit(uniqueKey, sortValue, dedupValue);
+ }
+
+ return new Hits(docList.getNumFound(), hitArr);
+ }
+
+ public HitDetails getDetails(Hit hit) throws IOException {
+ QueryResponse response;
+ try {
+ response = solr.query(new SolrQuery("id:\"" + hit.getUniqueKey() + "\""));
+ } catch (final SolrServerException e) {
+ throw SolrWriter.makeIOException(e);
+ }
+
+ final SolrDocumentList docList = response.getResults();
+ if (docList.getNumFound() == 0) {
+ return null;
+ }
+
+ return buildDetails(docList.get(0));
+ }
+
+ public HitDetails[] getDetails(Hit[] hits) throws IOException {
+ final StringBuilder buf = new StringBuilder();
+ buf.append("(");
+ for (final Hit hit : hits) {
+ buf.append(" id:\"");
+ buf.append(hit.getUniqueKey());
+ buf.append("\"");
+ }
+ buf.append(")");
+
+ QueryResponse response;
+ try {
+ response = solr.query(new SolrQuery(buf.toString()));
+ } catch (final SolrServerException e) {
+ throw SolrWriter.makeIOException(e);
+ }
+
+ final SolrDocumentList docList = response.getResults();
+ if (docList.size() < hits.length) {
+ throw new RuntimeException("Missing hit details! Found: " +
+ docList.size() + ", expecting: " +
+ hits.length);
+ }
+
+ /* Response returned from SOLR server may be out of
+ * order. So we make sure that nth element of HitDetails[]
+ * is the detail of nth hit.
+ */
+ final Map<String, HitDetails> detailsMap =
+ new HashMap<String, HitDetails>(hits.length);
+ for (final SolrDocument solrDoc : docList) {
+ final HitDetails details = buildDetails(solrDoc);
+ detailsMap.put(details.getValue("id"), details);
+ }
+
+ final HitDetails[] detailsArr = new HitDetails[hits.length];
+ for (int i = 0; i < hits.length; i++) {
+ detailsArr[i] = detailsMap.get(hits[i].getUniqueKey());
+ }
+
+ return detailsArr;
+ }
+
+ public boolean ping() throws IOException {
+ try {
+ return solr.ping().getStatus() == 0;
+ } catch (final SolrServerException e) {
+ throw SolrWriter.makeIOException(e);
+ }
+ }
+
+ public void close() throws IOException { }
+
+ private static HitDetails buildDetails(SolrDocument solrDoc) {
+ final List<String> fieldList = new ArrayList<String>();
+ final List<String> valueList = new ArrayList<String>();
+ for (final String field : solrDoc.getFieldNames()) {
+ for (final Object o : solrDoc.getFieldValues(field)) {
+ fieldList.add(field);
+ valueList.add(o.toString());
+ }
+ }
+
+ final String[] fields = fieldList.toArray(new String[fieldList.size()]);
+ final String[] values = valueList.toArray(new String[valueList.size()]);
+ return new HitDetails(fields, values);
+ }
+
+ /* Hackish solution for stringifying queries. Code from BooleanQuery.
+ * This is necessary because a BooleanQuery.toString produces
+ * statements like feed:http://www.google.com which doesn't work, we
+ * need feed:"http://www.google.com".
+ */
+ private static String stringify(BooleanQuery bQuery) {
+ final StringBuilder buffer = new StringBuilder();
+ final boolean needParens=(bQuery.getBoost() != 1.0) ||
+ (bQuery.getMinimumNumberShouldMatch()>0) ;
+ if (needParens) {
+ buffer.append("(");
+ }
+
+ final BooleanClause[] clauses = bQuery.getClauses();
+ int i = 0;
+ for (final BooleanClause c : clauses) {
+ if (c.isProhibited())
+ buffer.append("-");
+ else if (c.isRequired())
+ buffer.append("+");
+
+ final org.apache.lucene.search.Query subQuery = c.getQuery();
+ if (subQuery instanceof BooleanQuery) { // wrap sub-bools in parens
+ buffer.append("(");
+ buffer.append(c.getQuery().toString(""));
+ buffer.append(")");
+ } else if (subQuery instanceof TermQuery) {
+ final Term term = ((TermQuery) subQuery).getTerm();
+ buffer.append(term.field());
+ buffer.append(":\"");
+ buffer.append(term.text());
+ buffer.append("\"");
+ } else {
+ buffer.append(" ");
+ buffer.append(c.getQuery().toString());
+ }
+
+ if (i++ != clauses.length - 1) {
+ buffer.append(" ");
+ }
+ }
+
+ if (needParens) {
+ buffer.append(")");
+ }
+
+ if (bQuery.getMinimumNumberShouldMatch()>0) {
+ buffer.append('~');
+ buffer.append(bQuery.getMinimumNumberShouldMatch());
+ }
+
+ if (bQuery.getBoost() != 1.0f) {
+ buffer.append(ToStringUtils.boost(bQuery.getBoost()));
+ }
+
+ return buffer.toString();
+ }
+
+}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/searcher/Summary.java Mon Jan 12 05:26:16 2009
@@ -88,7 +88,7 @@
public boolean isEllipsis() { return true; }
}
- private ArrayList fragments = new ArrayList();
+ private ArrayList<Fragment> fragments = new ArrayList<Fragment>();
private static final Fragment[] FRAGMENT_PROTO = new Fragment[0];
@@ -100,7 +100,7 @@
/** Returns an array of all of this summary's fragments.*/
public Fragment[] getFragments() {
- return (Fragment[])fragments.toArray(FRAGMENT_PROTO);
+ return fragments.toArray(FRAGMENT_PROTO);
}
/** Returns a String representation of this Summary. */
@@ -126,7 +126,7 @@
Fragment fragment = null;
StringBuffer buf = new StringBuffer();
for (int i=0; i<fragments.size(); i++) {
- fragment = (Fragment) fragments.get(i);
+ fragment = fragments.get(i);
if (fragment.isHighlight()) {
buf.append("<span class=\"highlight\">")
.append(encode ? Entities.encode(fragment.getText())
@@ -185,7 +185,7 @@
out.writeInt(fragments.size());
Fragment fragment = null;
for (int i=0; i<fragments.size(); i++) {
- fragment = (Fragment) fragments.get(i);
+ fragment = fragments.get(i);
if (fragment.isHighlight()) {
out.writeByte(HIGHLIGHT);
Text.writeString(out, fragment.getText());
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/servlet/Cached.java Mon Jan 12 05:26:16 2009
@@ -77,7 +77,7 @@
}
Hit hit = new Hit(Integer.parseInt(request.getParameter("idx")),
- Integer.parseInt(request.getParameter("id")));
+ request.getParameter("id"));
HitDetails details = bean.getDetails(hit);
// raw bytes
Modified: lucene/nutch/trunk/src/plugin/build.xml
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/plugin/build.xml?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/plugin/build.xml (original)
+++ lucene/nutch/trunk/src/plugin/build.xml Mon Jan 12 05:26:16 2009
@@ -91,7 +91,7 @@
<!-- Test all of the plugins. -->
<!-- ====================================================== -->
<target name="test">
- <parallel threadCount="2">
+ <!--<parallel threadCount="2">-->
<ant dir="creativecommons" target="test"/>
<ant dir="languageidentifier" target="test"/>
<ant dir="lib-http" target="test"/>
@@ -118,7 +118,7 @@
<ant dir="urlnormalizer-basic" target="test"/>
<ant dir="urlnormalizer-pass" target="test"/>
<ant dir="urlnormalizer-regex" target="test"/>
- </parallel>
+ <!--</parallel>-->
</target>
<!-- ====================================================== -->
Modified: lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java
URL: http://svn.apache.org/viewvc/lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java?rev=733738&r1=733737&r2=733738&view=diff
==============================================================================
--- lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java (original)
+++ lucene/nutch/trunk/src/plugin/creativecommons/src/java/org/creativecommons/nutch/CCIndexingFilter.java Mon Jan 12 05:26:16 2009
@@ -17,20 +17,19 @@
package org.creativecommons.nutch;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field;
import org.apache.nutch.metadata.CreativeCommons;
import org.apache.nutch.parse.Parse;
import org.apache.nutch.indexer.IndexingFilter;
import org.apache.nutch.indexer.IndexingException;
+import org.apache.nutch.indexer.NutchDocument;
+import org.apache.nutch.indexer.lucene.LuceneWriter;
import org.apache.hadoop.io.Text;
import org.apache.nutch.crawl.CrawlDatum;
import org.apache.nutch.crawl.Inlinks;
import org.apache.nutch.metadata.Metadata;
-import org.apache.nutch.metadata.CreativeCommons;
import org.apache.hadoop.conf.Configuration;
@@ -50,7 +49,7 @@
private Configuration conf;
- public Document filter(Document doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
+ public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
throws IndexingException {
Metadata metadata = parse.getData().getParseMeta();
@@ -86,7 +85,7 @@
/** Add the features represented by a license URL. Urls are of the form
* "http://creativecommons.org/licenses/xx-xx/xx/xx", where "xx" names a
* license feature. */
- public void addUrlFeatures(Document doc, String urlString) {
+ public void addUrlFeatures(NutchDocument doc, String urlString) {
try {
URL url = new URL(urlString);
@@ -108,8 +107,12 @@
}
}
- private void addFeature(Document doc, String feature) {
- doc.add(new Field(FIELD, feature, Field.Store.YES, Field.Index.UN_TOKENIZED));
+ private void addFeature(NutchDocument doc, String feature) {
+ doc.add(FIELD, feature);
+ }
+
+ public void addIndexBackendOptions(Configuration conf) {
+ LuceneWriter.addFieldOptions(FIELD, LuceneWriter.STORE.YES, LuceneWriter.INDEX.UNTOKENIZED, conf);
}
public void setConf(Configuration conf) {