You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Ning Li <ni...@gmail.com> on 2008/02/06 19:59:51 UTC

Lucene-based Distributed Index Leveraging Hadoop

There have been several proposals for a Lucene-based distributed index
architecture.
 1) Doug Cutting's "Index Server Project Proposal" at
    http://www.mail-archive.com/general@lucene.apache.org/msg00338.html
 2) Solr's "Distributed Search" at
    http://wiki.apache.org/solr/DistributedSearch
 3) Mark Butler's "Distributed Lucene" at
    http://wiki.apache.org/hadoop/DistributedLucene

We have also been working on a Lucene-based distributed index architecture.
Our design differs from the above proposals in the way it leverages Hadoop
as much as possible. In particular, HDFS is used to reliably store Lucene
instances, Map/Reduce is used to analyze documents and update Lucene instances
in parallel, and Hadoop's IPC framework is used. Our design is geared for
applications that require a highly scalable index and where batch updates
to each Lucene instance are acceptable (verses finer-grained document at
a time updates).

We have a working implementation of our design and are in the process
of evaluating its performance. An overview of our design is provided below.
We welcome feedback and would like to know if you are interested in working
on it. If so, we would be happy to make the code publicly available. At the
same time, we would like to collaborate with people working on existing
proposals and see if we can consolidate our efforts.

TERMINOLOGY
A distributed "index" is partitioned into "shards". Each shard corresponds to
a Lucene instance and contains a disjoint subset of the documents in the index.
Each shard is stored in HDFS and served by one or more "shard servers". Here
we only talk about a single distributed index, but in practice multiple indexes
can be supported.

A "master" keeps track of the shard servers and the shards being served by
them. An "application" updates and queries the global index through an
"index client". An index client communicates with the shard servers to
execute a query.

KEY RPC METHODS
This section lists the key RPC methods in our design. To simplify the
discussion, some of their parameters have been omitted.

  On the Shard Servers
    // Execute a query on this shard server's Lucene instance.
    // This method is called by an index client.
    SearchResults search(Query query);

  On the Master
    // Tell the master to update the shards, i.e., Lucene instances.
    // This method is called by an index client.
    boolean updateShards(Configuration conf);

    // Ask the master where the shards are located.
    // This method is called by an index client.
    LocatedShards getShardLocations();

    // Send a heartbeat to the master. This method is called by a
    // shard server. In the response, the master informs the
    // shard server when to switch to a newer version of the index.
    ShardServerCommand sendHeartbeat();

QUERYING THE INDEX
To query the index, an application sends a search request to an index client.
The index client then calls the shard server search() method for each shard
of the index, merges the results and returns them to the application. The
index client caches the mapping between shards and shard servers by
periodically calling the master's getShardLocations() method.

UPDATING THE INDEX USING MAP/REDUCE
To update the index, an application sends an update request to an index client.
The index client then calls the master's updateShards() method, which schedules
a Map/Reduce job to update the index. The Map/Reduce job updates the shards in
parallel and copies the new index files of each shard (i.e., Lucene instance)
to HDFS.

The updateShards() method includes a "configuration", which provides
information for updating the shards. More specifically, the configuration
includes the following information:
  - Input path. This provides the location of updated documents, e.g., HDFS
    files or directories, or HBase tables.
  - Input formatter. This specifies how to format the input documents.
  - Analysis. This defines the analyzer to use on the input. The analyzer
    determines whether a document is being inserted, updated, or deleted. For
    inserts or updates, the analyzer also converts each input document into
    a Lucene document.

The Map phase of the Map/Reduce job formats and analyzes the input (in
parallel), while the Reduce phase collects and applies the updates to each
Lucene instance (again in parallel). The updates are applied using the local
file system where a Reduce task runs and then copied back to HDFS. For example,
if the updates caused a new Lucene segment to be created, the new segment
would be created on the local file system first, and then copied back to HDFS.

When the Map/Reduce job completes, a "new version" of the index is ready to be
queried. It is important to note that the new version of the index is not
derived from scratch. By leveraging Lucene's update algorithm, the new version
of each Lucene instance will share as many files as possible as the previous
version.

ENSURING INDEX CONSISTENCY
At any point in time, an index client always has a consistent view of the
shards in the index. The results of a search query include either all or none
of a recent update to the index. The details of the algorithm to accomplish
this are omitted here, but the basic flow is pretty simple.

After the Map/Reduce job to update the shards completes, the master will tell
each shard server to "prepare" the new version of the index. After all the
shard servers have responded affirmatively to the "prepare" message, the new
index is ready to be queried. An index client will then lazily learn about
the new index when it makes its next getShardLocations() call to the master.
In essence, a lazy two-phase commit protocol is used, with "prepare" and
"commit" messages piggybacked on heartbeats. After a shard has switched to
the new index, the Lucene files in the old index that are no longer needed
can safely be deleted.

ACHIEVING FAULT-TOLERANCE
We rely on the fault-tolerance of Map/Reduce to guarantee that an index update
will eventually succeed. All shards are stored in HDFS and can be read by any
shard server in a cluster. For a given shard, if one of its shard servers dies,
new search requests are handled by its surviving shard servers. To ensure that
there is always enough coverage for a shard, the master will instruct other
shard servers to take over the shards of a dead shard server.

PERFORMANCE ISSUES
Currently, each shard server reads a shard directly from HDFS. Experiments
have shown that this approach does not perform very well, with HDFS causing
Lucene to slow down fairly dramatically (by well over 5x when data blocks are
accessed over the network). Consequently, we are exploring different ways to
leverage the fault tolerance of HDFS and, at the same time, work around its
performance problems. One simple alternative is to add a local file system
cache on each shard server. Another alternative is to modify HDFS so that an
application has more control over where to store the primary and replicas of
an HDFS block. This feature may be useful for other HDFS applications (e.g.,
HBase). We would like to collaborate with other people who are interested in
adding this feature to HDFS.


Regards,
Ning Li

RE: sort by value

Posted by Qiong Zhang <ja...@yahoo-inc.com>.
Thank you all for the reply. 

Looks like the class KeyFieldBasedPartitioner in
org.apache.hadoop.mapred.lib can be used in Hadoop streaming to sort
both key (like primary key) and value (like secondary key) without data
duplication.

It is useful if we have same functionality in the native Java API.

James
-----Original Message-----
From: Ted Dunning [mailto:tdunning@veoh.com] 
Sent: Wednesday, February 06, 2008 1:53 PM
To: core-user@hadoop.apache.org
Subject: Re: sort by value




On 2/6/08 11:58 AM, "Joydeep Sen Sarma" <js...@facebook.com> wrote:

> 
>> But it actually adds duplicate data (i.e., the value column which
> needs 
>> sorting) to the key.
> 
> Why? U can always take it out of the value to remove the redundancy.
> 

Actually, you can't in most cases.

Suppose you have input data like this:

   a, b_1
   a, b_2
   a, b_1

And then the mapper produces data like this for each input record:

   a, b, 1
   a, *, 1
   a, b_2, 1
   a, *, 1
   a, b_1, 1
   a, *, 1

If you use the first two fields as the key so that you can sort the
records
nicely, you get the following inputs to the reducer

   <a, *>, [3, 2, 1]

You now don't know what the counts go to except for the first one.  If
you
replicate the second field in the value output of the map, then you get
this

   <a, *>, [[*,3], [b_1, 2], [b_2, 1]]

And you can produce the desired output:

   a, b_1, 2/3
   a, b_2, 1/3


Re: sort by value

Posted by Owen O'Malley <oo...@yahoo-inc.com>.
On Feb 6, 2008, at 2:52 PM, Joydeep Sen Sarma wrote:

> ok - got it. This seems to be a subtle drawback in the reduce api.  
> Keys
> in the same reduce group may differ - but the reduce api does not make
> it possible for the reducer function to get access to each key. It  
> only
> gets access to the starting key value for that group.

With a context object api, it would be:

public interface Reducer<KeyIn,ValueIn,KeyOut,ValueOut> extends  
Closeable {
   void reduce(ReducerContext<KeyIn,ValueIn,KeyOut,ValueOut> context
               ) throws IOException, InterruptedException;
}

and the ReducerContext would have:

KeyIn getKey();

to get the key. Then your assumption would work because it would be  
the current key. Now if only I had time to work on HADOOP-1230. *smile*

-- Owen

RE: sort by value

Posted by Joydeep Sen Sarma <js...@facebook.com>.
ok - got it. This seems to be a subtle drawback in the reduce api. Keys
in the same reduce group may differ - but the reduce api does not make
it possible for the reducer function to get access to each key. It only
gets access to the starting key value for that group.

If the api was instead:

class KVpair { WritableComparable key, Writable value }
reduce(WritableComparable groupKey, Iterator<KVpair> keyvalues)

then we would be in good shape (since we can see the key and the value
and don't have to duplicate any data across them).

The underlying iterator has access to this data - it's just not
available through the api.

I suspect that these kinds of small optimizations are too complex to
make for a one-time job - but for any query language on top of hadoop -
it's a one time effort and probably worth it.

joydeep

-----Original Message-----
From: Ted Dunning [mailto:tdunning@veoh.com] 
Sent: Wednesday, February 06, 2008 1:53 PM
To: core-user@hadoop.apache.org
Subject: Re: sort by value




On 2/6/08 11:58 AM, "Joydeep Sen Sarma" <js...@facebook.com> wrote:

> 
>> But it actually adds duplicate data (i.e., the value column which
> needs 
>> sorting) to the key.
> 
> Why? U can always take it out of the value to remove the redundancy.
> 

Actually, you can't in most cases.

Suppose you have input data like this:

   a, b_1
   a, b_2
   a, b_1

And then the mapper produces data like this for each input record:

   a, b, 1
   a, *, 1
   a, b_2, 1
   a, *, 1
   a, b_1, 1
   a, *, 1

If you use the first two fields as the key so that you can sort the
records
nicely, you get the following inputs to the reducer

   <a, *>, [3, 2, 1]

You now don't know what the counts go to except for the first one.  If
you
replicate the second field in the value output of the map, then you get
this

   <a, *>, [[*,3], [b_1, 2], [b_2, 1]]

And you can produce the desired output:

   a, b_1, 2/3
   a, b_2, 1/3


Re: sort by value

Posted by Ted Dunning <td...@veoh.com>.


On 2/6/08 11:58 AM, "Joydeep Sen Sarma" <js...@facebook.com> wrote:

> 
>> But it actually adds duplicate data (i.e., the value column which
> needs 
>> sorting) to the key.
> 
> Why? U can always take it out of the value to remove the redundancy.
> 

Actually, you can't in most cases.

Suppose you have input data like this:

   a, b_1
   a, b_2
   a, b_1

And then the mapper produces data like this for each input record:

   a, b, 1
   a, *, 1
   a, b_2, 1
   a, *, 1
   a, b_1, 1
   a, *, 1

If you use the first two fields as the key so that you can sort the records
nicely, you get the following inputs to the reducer

   <a, *>, [3, 2, 1]

You now don't know what the counts go to except for the first one.  If you
replicate the second field in the value output of the map, then you get this

   <a, *>, [[*,3], [b_1, 2], [b_2, 1]]

And you can produce the desired output:

   a, b_1, 2/3
   a, b_2, 1/3


RE: sort by value

Posted by Joydeep Sen Sarma <js...@facebook.com>.
> But it actually adds duplicate data (i.e., the value column which
needs 
> sorting) to the key.

Why? U can always take it out of the value to remove the redundancy.

> Also, I wonder what is the benefit to sort values before reaching
> reducers. It can be achieved in the reduce phase anyway.

The reduce only does a merge of sorted segments. The segments have to be
sorted using all the sort fields before the merge itself. Otherwise u
can't do a merge. (hope I understood the question right)


-----Original Message-----
From: Qiong Zhang [mailto:jamesz@yahoo-inc.com] 
Sent: Wednesday, February 06, 2008 11:25 AM
To: core-user@hadoop.apache.org
Subject: sort by value


Hi, All,

Is there a better way to sort by value in the same key before reaching
reducers?

I know it can be achieved by using
setOutputValueGroupingComparator/setOutputKeyComparatorClass.

But it actually adds duplicate data (i.e., the value column which needs
sorting) to the key.

Also, I wonder what is the benefit to sort values before reaching
reducers.
It can be achieved in the reduce phase anyway.

Thanks,
James

Re: sort by value

Posted by Ted Dunning <td...@veoh.com>.

The method to describe is the standard approach.

The benefit is that the data that arrives at the reducer might be larger
than you want to store in memory (for sorting by the reduce).  Also, reading
the entire set of reduce values would increase the amount of data allocated
and would mean that you would need to make two passes over each reduce set
(at least).  Sorting in the shuffle phase is essentially free.

One conventional use of this sorting is to ensure that summary data is
processed before other data.  For instance, if you are estimating
conditional probabilities p(b | a) and you have counts k(a, b) and k(a, *)
then it is nice to reduce on a so that you get k(a, *), k(a, b_1), k(a,
b_2)... as the input to the reducer.  With a simple sort, you can guarantee
that the k(a,*) value comes first which makes it easier to computer
k(a,b)/k(a,*) since you would already have the value of k(a,*) handy.

Another, much more obscure, use is in co-grouping where sorting in random
order can help minimize memory use for temporary buffering as you split the
reduce values into two or more lists or if you implement iterators over
virtual lists.


On 2/6/08 11:24 AM, "Qiong Zhang" <ja...@yahoo-inc.com> wrote:

> 
> Hi, All,
> 
> Is there a better way to sort by value in the same key before reaching
> reducers?
> 
> I know it can be achieved by using
> setOutputValueGroupingComparator/setOutputKeyComparatorClass.
> 
> But it actually adds duplicate data (i.e., the value column which needs
> sorting) to the key.
> 
> Also, I wonder what is the benefit to sort values before reaching
> reducers.
> It can be achieved in the reduce phase anyway.
> 
> Thanks,
> James


sort by value

Posted by Qiong Zhang <ja...@yahoo-inc.com>.
Hi, All,

Is there a better way to sort by value in the same key before reaching
reducers?

I know it can be achieved by using
setOutputValueGroupingComparator/setOutputKeyComparatorClass.

But it actually adds duplicate data (i.e., the value column which needs
sorting) to the key.

Also, I wonder what is the benefit to sort values before reaching
reducers.
It can be achieved in the reduce phase anyway.

Thanks,
James

Re: Lucene-based Distributed Index Leveraging Hadoop

Posted by "Peter W." <pe...@marketingbrokers.com>.
Howdy,

Your work is outstanding and will hopefully be adopted soon.

The HDFS distributed Lucene index solves many of the various
dependencies introduced by achieving this another way using
RMI, HTTP (serialized objects w/servlets) or Tomcat balancing
with mysql databases, schemas and connection pools.

Before this, other mixed options were available where Nutch
obtains documents, html/ and xml parsers extract data, Hadoop
reduces those results and Lucene stores, indexes same.

Something like get document(Nutch), REST post as XML(Solr), XML to
data(ROME,Abdera), data to map(Hadoop), reduce to tables(Hadoop,HBase)
then reconstruct bytes to Lucene Document object for indexing.

Obviously, yours is cleaner and more scalable.

I'd want the master also to keep track of (task[id],[comp]leted,[prog] 
ress)
in ways kind of like tables you could perform status updates:

+------+------+------+
| id   | comp | prog |
+------+------+------+

Also, maybe the following indexing pipeline...

index clients:
from remote app machine1,machine2,machine3 using hdfs

batch index lucene documents (hundred at a time)
place in single encapsulation object
connect to master
select task id where (completed=0) && (progress=0)
update progress=1
put object (hdfs)

master:
recreate collection from stream (in)
iterate object, cast items to Document
hash document key in the mapper, contents are IM
index Lucene documents in reducer allowing
Text object access for filtering purposes
return indexed # as integer (rpc response)

back on clients:
updated progress=0,comp=1 when finished
send master confirmation info with heartbeat

Then add dates and logic for fixing extended race
conditions where (completed=0) && (progress=1) on
the master where clients can resubmit jobs using
confirmed keys received as inventory lists.

To update progress and completed tasks, somehow
check the size of part-files in each labeled out dir
or monitor Hadoop logs in appropriate temp dir.

Run new JobClients accordingly.

Sweet,

Peter W.




On Feb 6, 2008, at 10:59 AM, Ning Li wrote:

> There have been several proposals for a Lucene-based distributed index
> architecture.
>  1) Doug Cutting's "Index Server Project Proposal" at
>     http://www.mail-archive.com/general@lucene.apache.org/ 
> msg00338.html
>  2) Solr's "Distributed Search" at
>     http://wiki.apache.org/solr/DistributedSearch
>  3) Mark Butler's "Distributed Lucene" at
>     http://wiki.apache.org/hadoop/DistributedLucene
>
> We have also been working on a Lucene-based distributed index  
> architecture.
> Our design differs from the above proposals in the way it leverages  
> Hadoop
> as much as possible. In particular, HDFS is used to reliably store  
> Lucene
> instances, Map/Reduce is used to analyze documents and update  
> Lucene instances
> in parallel, and Hadoop's IPC framework is used. Our design is  
> geared for
> applications that require a highly scalable index and where batch  
> updates
> to each Lucene instance are acceptable (verses finer-grained  
> document at
> a time updates).
>
> We have a working implementation of our design and are in the process
> of evaluating its performance. An overview of our design is  
> provided below.
> We welcome feedback and would like to know if you are interested in  
> working
> on it. If so, we would be happy to make the code publicly  
> available. At the
> same time, we would like to collaborate with people working on  
> existing
> proposals and see if we can consolidate our efforts.
>
> TERMINOLOGY
> A distributed "index" is partitioned into "shards". Each shard  
> corresponds to
> a Lucene instance and contains a disjoint subset of the documents  
> in the index.
> Each shard is stored in HDFS and served by one or more "shard  
> servers". Here
> we only talk about a single distributed index, but in practice  
> multiple indexes
> can be supported.
>
> A "master" keeps track of the shard servers and the shards being  
> served by
> them. An "application" updates and queries the global index through an
> "index client". An index client communicates with the shard servers to
> execute a query.
>
> KEY RPC METHODS
> This section lists the key RPC methods in our design. To simplify the
> discussion, some of their parameters have been omitted.
>
>   On the Shard Servers
>     // Execute a query on this shard server's Lucene instance.
>     // This method is called by an index client.
>     SearchResults search(Query query);
>
>   On the Master
>     // Tell the master to update the shards, i.e., Lucene instances.
>     // This method is called by an index client.
>     boolean updateShards(Configuration conf);
>
>     // Ask the master where the shards are located.
>     // This method is called by an index client.
>     LocatedShards getShardLocations();
>
>     // Send a heartbeat to the master. This method is called by a
>     // shard server. In the response, the master informs the
>     // shard server when to switch to a newer version of the index.
>     ShardServerCommand sendHeartbeat();
>
> QUERYING THE INDEX
> To query the index, an application sends a search request to an  
> index client.
> The index client then calls the shard server search() method for  
> each shard
> of the index, merges the results and returns them to the  
> application. The
> index client caches the mapping between shards and shard servers by
> periodically calling the master's getShardLocations() method.
>
> UPDATING THE INDEX USING MAP/REDUCE
> To update the index, an application sends an update request to an  
> index client.
> The index client then calls the master's updateShards() method,  
> which schedules
> a Map/Reduce job to update the index. The Map/Reduce job updates  
> the shards in
> parallel and copies the new index files of each shard (i.e., Lucene  
> instance)
> to HDFS.
>
> The updateShards() method includes a "configuration", which provides
> information for updating the shards. More specifically, the  
> configuration
> includes the following information:
>   - Input path. This provides the location of updated documents,  
> e.g., HDFS
>     files or directories, or HBase tables.
>   - Input formatter. This specifies how to format the input documents.
>   - Analysis. This defines the analyzer to use on the input. The  
> analyzer
>     determines whether a document is being inserted, updated, or  
> deleted. For
>     inserts or updates, the analyzer also converts each input  
> document into
>     a Lucene document.
>
> The Map phase of the Map/Reduce job formats and analyzes the input (in
> parallel), while the Reduce phase collects and applies the updates  
> to each
> Lucene instance (again in parallel). The updates are applied using  
> the local
> file system where a Reduce task runs and then copied back to HDFS.  
> For example,
> if the updates caused a new Lucene segment to be created, the new  
> segment
> would be created on the local file system first, and then copied  
> back to HDFS.
>
> When the Map/Reduce job completes, a "new version" of the index is  
> ready to be
> queried. It is important to note that the new version of the index  
> is not
> derived from scratch. By leveraging Lucene's update algorithm, the  
> new version
> of each Lucene instance will share as many files as possible as the  
> previous
> version.
>
> ENSURING INDEX CONSISTENCY
> At any point in time, an index client always has a consistent view  
> of the
> shards in the index. The results of a search query include either  
> all or none
> of a recent update to the index. The details of the algorithm to  
> accomplish
> this are omitted here, but the basic flow is pretty simple.
>
> After the Map/Reduce job to update the shards completes, the master  
> will tell
> each shard server to "prepare" the new version of the index. After  
> all the
> shard servers have responded affirmatively to the "prepare"  
> message, the new
> index is ready to be queried. An index client will then lazily  
> learn about
> the new index when it makes its next getShardLocations() call to  
> the master.
> In essence, a lazy two-phase commit protocol is used, with  
> "prepare" and
> "commit" messages piggybacked on heartbeats. After a shard has  
> switched to
> the new index, the Lucene files in the old index that are no longer  
> needed
> can safely be deleted.
>
> ACHIEVING FAULT-TOLERANCE
> We rely on the fault-tolerance of Map/Reduce to guarantee that an  
> index update
> will eventually succeed. All shards are stored in HDFS and can be  
> read by any
> shard server in a cluster. For a given shard, if one of its shard  
> servers dies,
> new search requests are handled by its surviving shard servers. To  
> ensure that
> there is always enough coverage for a shard, the master will  
> instruct other
> shard servers to take over the shards of a dead shard server.
>
> PERFORMANCE ISSUES
> Currently, each shard server reads a shard directly from HDFS.  
> Experiments
> have shown that this approach does not perform very well, with HDFS  
> causing
> Lucene to slow down fairly dramatically (by well over 5x when data  
> blocks are
> accessed over the network). Consequently, we are exploring  
> different ways to
> leverage the fault tolerance of HDFS and, at the same time, work  
> around its
> performance problems. One simple alternative is to add a local file  
> system
> cache on each shard server. Another alternative is to modify HDFS  
> so that an
> application has more control over where to store the primary and  
> replicas of
> an HDFS block. This feature may be useful for other HDFS  
> applications (e.g.,
> HBase). We would like to collaborate with other people who are  
> interested in
> adding this feature to HDFS.
>
>
> Regards,
> Ning Li


Re: Lucene-based Distributed Index Leveraging Hadoop

Posted by Ning Li <ni...@gmail.com>.
We welcome your input. Discussions are mainly on
general@lucene.apache.org now (a thread with the same title).

On 2/7/08, Dennis Kubes <ku...@apache.org> wrote:
> This is actually something we were planning on building into Nutch.
>
> Dennis

Re: Lucene-based Distributed Index Leveraging Hadoop

Posted by Dennis Kubes <ku...@apache.org>.
This is actually something we were planning on building into Nutch.

Dennis

Ning Li wrote:
> On 2/6/08, Ted Dunning <td...@veoh.com> wrote:
>> Our best work-around is to simply take a shard out of service during delivery
>> of an updated index.  This is obviously not a good solution.
> 
> How many shard servers are serving each shard? If it's more than one,
> you can have the rest of the shard servers sharing the query workload
> while one shard server loads a new version of a shard.
> 
> We'd like to start an open source project for this (or join if one
> already exists) if there is enough interest.

Re: Lucene-based Distributed Index Leveraging Hadoop

Posted by Ted Dunning <td...@veoh.com>.
We have quite a few serving the load, but if we are trying to update
relatively often (say every 30 minutes), then having a server out of action
for several minutes really hurts.  The outage is that long because you have
to

A) turn off traffic
B) wait for traffic to actually stop
C) move the multi-gigabyte index to the machine
D) warm up the new index
E) start traffic
F) wait for traffic to actually fully start
G) declare switch-over complete

Depending on your update interval, this can easily each 30-40% of your
capacity which seems absurd since a hot search engine rarely tries to read
from disk at all.


On 2/6/08 3:56 PM, "Ning Li" <ni...@gmail.com> wrote:

> How many shard servers are serving each shard? If it's more than one,
> you can have the rest of the shard servers sharing the query workload
> while one shard server loads a new version of a shard.


Re: Lucene-based Distributed Index Leveraging Hadoop

Posted by Ning Li <ni...@gmail.com>.
On 2/6/08, Ted Dunning <td...@veoh.com> wrote:
> Our best work-around is to simply take a shard out of service during delivery
> of an updated index.  This is obviously not a good solution.

How many shard servers are serving each shard? If it's more than one,
you can have the rest of the shard servers sharing the query workload
while one shard server loads a new version of a shard.

We'd like to start an open source project for this (or join if one
already exists) if there is enough interest.

Re: Lucene-based Distributed Index Leveraging Hadoop

Posted by Ted Dunning <td...@veoh.com>.
Very nice summary.

One of the issues that we have had with multiple search servers is that on
linux, there can be substantial contention for disk I/O.  This means that as
a new index is being written, access to the current index can be stalled for
very long periods of time (sometimes >10s).  This problem is very difficult
to work around even with different I/O schedulers, write throttling and
other measures.  Our best work-around is to simply take a shard out of
service during delivery of an updated index.  This is obviously not a good
solution.

Regarding your comments about control over where the index resides, it may
help in the short run to write the original version of the index from one of
the storage nodes.  That will let you ensure that at least one copy of the
index is local to that machine.  Rebalancing will destroy that locality over
time, of course, so you may have problems there.  It would obviously be
preferable to be able to say to HDFS something like "the following 5 hosts
are preferred locations for replicas of this file" and have that preference
be respected during allocation and rebalancing.


On 2/6/08 10:59 AM, "Ning Li" <ni...@gmail.com> wrote:

> There have been several proposals for a Lucene-based distributed index
> architecture.
>  1) Doug Cutting's "Index Server Project Proposal" at
>     http://www.mail-archive.com/general@lucene.apache.org/msg00338.html
>  2) Solr's "Distributed Search" at
>     http://wiki.apache.org/solr/DistributedSearch
>  3) Mark Butler's "Distributed Lucene" at
>     http://wiki.apache.org/hadoop/DistributedLucene
> 
> We have also been working on a Lucene-based distributed index architecture.
> Our design differs from the above proposals in the way it leverages Hadoop
> as much as possible. In particular, HDFS is used to reliably store Lucene
> instances, Map/Reduce is used to analyze documents and update Lucene instances
> in parallel, and Hadoop's IPC framework is used. Our design is geared for
> applications that require a highly scalable index and where batch updates
> to each Lucene instance are acceptable (verses finer-grained document at
> a time updates).
> 
> We have a working implementation of our design and are in the process
> of evaluating its performance. An overview of our design is provided below.
> We welcome feedback and would like to know if you are interested in working
> on it. If so, we would be happy to make the code publicly available. At the
> same time, we would like to collaborate with people working on existing
> proposals and see if we can consolidate our efforts.
> 
> TERMINOLOGY
> A distributed "index" is partitioned into "shards". Each shard corresponds to
> a Lucene instance and contains a disjoint subset of the documents in the
> index.
> Each shard is stored in HDFS and served by one or more "shard servers". Here
> we only talk about a single distributed index, but in practice multiple
> indexes
> can be supported.
> 
> A "master" keeps track of the shard servers and the shards being served by
> them. An "application" updates and queries the global index through an
> "index client". An index client communicates with the shard servers to
> execute a query.
> 
> KEY RPC METHODS
> This section lists the key RPC methods in our design. To simplify the
> discussion, some of their parameters have been omitted.
> 
>   On the Shard Servers
>     // Execute a query on this shard server's Lucene instance.
>     // This method is called by an index client.
>     SearchResults search(Query query);
> 
>   On the Master
>     // Tell the master to update the shards, i.e., Lucene instances.
>     // This method is called by an index client.
>     boolean updateShards(Configuration conf);
> 
>     // Ask the master where the shards are located.
>     // This method is called by an index client.
>     LocatedShards getShardLocations();
> 
>     // Send a heartbeat to the master. This method is called by a
>     // shard server. In the response, the master informs the
>     // shard server when to switch to a newer version of the index.
>     ShardServerCommand sendHeartbeat();
> 
> QUERYING THE INDEX
> To query the index, an application sends a search request to an index client.
> The index client then calls the shard server search() method for each shard
> of the index, merges the results and returns them to the application. The
> index client caches the mapping between shards and shard servers by
> periodically calling the master's getShardLocations() method.
> 
> UPDATING THE INDEX USING MAP/REDUCE
> To update the index, an application sends an update request to an index
> client.
> The index client then calls the master's updateShards() method, which
> schedules
> a Map/Reduce job to update the index. The Map/Reduce job updates the shards in
> parallel and copies the new index files of each shard (i.e., Lucene instance)
> to HDFS.
> 
> The updateShards() method includes a "configuration", which provides
> information for updating the shards. More specifically, the configuration
> includes the following information:
>   - Input path. This provides the location of updated documents, e.g., HDFS
>     files or directories, or HBase tables.
>   - Input formatter. This specifies how to format the input documents.
>   - Analysis. This defines the analyzer to use on the input. The analyzer
>     determines whether a document is being inserted, updated, or deleted. For
>     inserts or updates, the analyzer also converts each input document into
>     a Lucene document.
> 
> The Map phase of the Map/Reduce job formats and analyzes the input (in
> parallel), while the Reduce phase collects and applies the updates to each
> Lucene instance (again in parallel). The updates are applied using the local
> file system where a Reduce task runs and then copied back to HDFS. For
> example,
> if the updates caused a new Lucene segment to be created, the new segment
> would be created on the local file system first, and then copied back to HDFS.
> 
> When the Map/Reduce job completes, a "new version" of the index is ready to be
> queried. It is important to note that the new version of the index is not
> derived from scratch. By leveraging Lucene's update algorithm, the new version
> of each Lucene instance will share as many files as possible as the previous
> version.
> 
> ENSURING INDEX CONSISTENCY
> At any point in time, an index client always has a consistent view of the
> shards in the index. The results of a search query include either all or none
> of a recent update to the index. The details of the algorithm to accomplish
> this are omitted here, but the basic flow is pretty simple.
> 
> After the Map/Reduce job to update the shards completes, the master will tell
> each shard server to "prepare" the new version of the index. After all the
> shard servers have responded affirmatively to the "prepare" message, the new
> index is ready to be queried. An index client will then lazily learn about
> the new index when it makes its next getShardLocations() call to the master.
> In essence, a lazy two-phase commit protocol is used, with "prepare" and
> "commit" messages piggybacked on heartbeats. After a shard has switched to
> the new index, the Lucene files in the old index that are no longer needed
> can safely be deleted.
> 
> ACHIEVING FAULT-TOLERANCE
> We rely on the fault-tolerance of Map/Reduce to guarantee that an index update
> will eventually succeed. All shards are stored in HDFS and can be read by any
> shard server in a cluster. For a given shard, if one of its shard servers
> dies,
> new search requests are handled by its surviving shard servers. To ensure that
> there is always enough coverage for a shard, the master will instruct other
> shard servers to take over the shards of a dead shard server.
> 
> PERFORMANCE ISSUES
> Currently, each shard server reads a shard directly from HDFS. Experiments
> have shown that this approach does not perform very well, with HDFS causing
> Lucene to slow down fairly dramatically (by well over 5x when data blocks are
> accessed over the network). Consequently, we are exploring different ways to
> leverage the fault tolerance of HDFS and, at the same time, work around its
> performance problems. One simple alternative is to add a local file system
> cache on each shard server. Another alternative is to modify HDFS so that an
> application has more control over where to store the primary and replicas of
> an HDFS block. This feature may be useful for other HDFS applications (e.g.,
> HBase). We would like to collaborate with other people who are interested in
> adding this feature to HDFS.
> 
> 
> Regards,
> Ning Li