You are viewing a plain text version of this content. The canonical link for it is here.
Posted to solr-user@lucene.apache.org by tedsolr <ts...@sciquest.com> on 2016/07/20 21:29:30 UTC

Re: Specify sorting of merged streams

I am getting an OOM error trying to combine streaming operations. I think the
sort is the issue. This test was done on a single replica cloud setup of
v6.1 with 4GB heap. col1 has 1M docs. col2 has 10k docs. The search for each
collection was q=*:*. Using SolrJ:

CloudSolrStream searchStream = new CloudSolrStream(zkHosts, "col1", map); //
base search from user
ReducerStream reducer = new ReducerStream(searchStream, comparator,
reducer); //  this groups docs by a field list
SortStream idSort = new SortStream(reducer, new FieldComparator("id",
ComparatorOrder.ASCENDING)); // this resorts the results so I can compare
them against data from another collection
CloudSolrStream ruleStream = new CloudSolrStream(zkHosts, "col2", map); 
TupleStream stream = new IntersectStream(idSort, ruleStream, new
FieldEqualitor("id")); // this provides a filter

When I open the stream it works for about 30 seconds then dies. So a single
user search on a very small collection (1M docs) can overwhelm a 4GB heap.
And this chain isn't even done! I still need to merge with yet a third
collection and then resort with the user's specified sort parameter.

Is there something fundamentally wrong with my approach?
thanks, Ted



Joel Bernstein wrote
> Hi,
> 
> The streaming API in Solr 6x has been expanded to supported many different
> parallel computing workloads. For example the topic stream supports
> pub/sub
> messaging. The gatherNodes stream supports graph traversal. The facet
> stream supports aggregations inside the search engine, while the rollup
> stream supports shuffling map / reduce aggregations. Stored queries and
> large scale alerting is on the way...
> 
> The sort stream is designed to be used at scale in parallel mode. It can
> currently sort about 1,000,000 docs per second on a single worker. So if
> you have 20 workers it can sort 20,000,000 docs per second. The plan is to
> eventually switch to the fork/join merge sort so that you get parallelism
> within the same worker.
> 
> 
> 
> Joel Bernstein
> http://joelsolr.blogspot.com/





--
View this message in context: http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288083.html
Sent from the Solr - User mailing list archive at Nabble.com.

Re: Specify sorting of merged streams

Posted by tedsolr <ts...@sciquest.com>.
The primary use case seems to require a SortStream. Ignoring the large join
for now...
1. search main collection with stream (a)
2. search other collection with stream (b)
3. hash join a & b (c)
4. full sort on c
5. aggregate c with reducer
6. apply user sort criteria with top

It's very likely that a field from 'b' will be in the user's search criteria
and the results list. Therefore the merge (3) must occur before the
aggregation (5), and the aggregation requires the sort (4). If this can't be
done without lots of hardware and memory, perhaps I'd be better off leaving
the data denormalized and increasing index speed by sharding (with many more
VMs). Will sharding increase re-indexing speed by a factor close to the # of
shards (collection with 10 shards indexes ~10x faster than same collection
with one shard)? 


Joel Bernstein wrote
> The tricky thing you have is a large join coupled with a reduce() group,
> which have different sorts.
> 
> ...
> 
>> Joel Bernstein wrote
>> > A few other things for you to consider:
>> >
>> > 1) How big are the joins?
>> > 2) How fast do they need to go?
>> > 3) How many queries need to run concurrently?
>> >
>> > #1 and 2# will dictate how many shards, replicas and parallel workers
>> are
>> > needed to perform the join. #3 needs to be carefully considered because
>> > MapReduce distributed joins are not going to scale like traditional
>> Solr
>> > queries.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288194.html
>> Sent from the Solr - User mailing list archive at Nabble.com.
>>





--
View this message in context: http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288288.html
Sent from the Solr - User mailing list archive at Nabble.com.

Re: Specify sorting of merged streams

Posted by Joel Bernstein <jo...@gmail.com>.
The tricky thing you have is a large join coupled with a reduce() group,
which have different sorts.

If you have a big enough cluster with enough workers, shards and replicas
you can make this work.

For example if you partition the large join across 30 workers, the hash
join would fit in the available RAM across the workers. Same goes for the
in-memory sort, if you partition the sort across 30 workers then the sort
would work for you because you'd have enough RAM in total to do the sort.

The key to the performance is how fast you can export the data. For example
if you have a join with 20 million documents from one collection and you
want to do it in sub-second, you have to be able to export faster then 20
million documents per second. You increase your export performance by
adding shards, replicas and workers.

There is a document in the works that describes how to scale the MapReduce
architecture. I'll attempt to get this finished for the 6.2 release:

https://cwiki.apache.org/confluence/display/solr/MapReduce%2C+Shuffling+and+Worker+Collections

The 12 concurrent queries also need to be explored because if you're doing
12 simultaneous large distributed joins, that will create significant
network traffic.

The MapReduce functions of the Streaming API were originally designed for
interactive data exploration on large data sets. So it serves the same use
case as Hive, Pig or Impala.

The non-MapReduce functions in the streaming expression library are
designed for higher QPS OLTP use cases. The facet(), stats() and
gatherNodes() functions are examples of functions that can operate at a
higher QPS.

But, if you have a large enough cluster and the distributed joins are not
too large, then you could probably run the distributed joins with a
moderate level of QPS.














Joel Bernstein
http://joelsolr.blogspot.com/

On Thu, Jul 21, 2016 at 11:08 AM, tedsolr <ts...@sciquest.com> wrote:

> I can see I may need to rethink some things. I have two joins: one is 1 to
> 1
> (very large) and one is 1 to .03. A HashJoin may work on the smaller one.
> The large join looks like it may not be possible. I could get away with
> treating it as a filter somehow - I don't need the fields from the
> documents. Such as ... include col1 document (id=123) if col2 contains
> document with id=123.
>
> This whole chain is a real-time user search. A 1-2 sec response would be
> ideal, but I'm sacrificing speed in order to get the reindexing to run much
> faster.
>
> Concurrency is low - like a dozen. Have you read any blogs on balancing #
> shards vs # replicas? Any guidelines on estimating the number of VMs this
> may require would be great.
>
>
> Joel Bernstein wrote
> > A few other things for you to consider:
> >
> > 1) How big are the joins?
> > 2) How fast do they need to go?
> > 3) How many queries need to run concurrently?
> >
> > #1 and 2# will dictate how many shards, replicas and parallel workers are
> > needed to perform the join. #3 needs to be carefully considered because
> > MapReduce distributed joins are not going to scale like traditional Solr
> > queries.
>
>
>
>
>
> --
> View this message in context:
> http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288194.html
> Sent from the Solr - User mailing list archive at Nabble.com.
>

Re: Specify sorting of merged streams

Posted by tedsolr <ts...@sciquest.com>.
I can see I may need to rethink some things. I have two joins: one is 1 to 1
(very large) and one is 1 to .03. A HashJoin may work on the smaller one.
The large join looks like it may not be possible. I could get away with
treating it as a filter somehow - I don't need the fields from the
documents. Such as ... include col1 document (id=123) if col2 contains
document with id=123.

This whole chain is a real-time user search. A 1-2 sec response would be
ideal, but I'm sacrificing speed in order to get the reindexing to run much
faster.

Concurrency is low - like a dozen. Have you read any blogs on balancing #
shards vs # replicas? Any guidelines on estimating the number of VMs this
may require would be great.


Joel Bernstein wrote
> A few other things for you to consider:
> 
> 1) How big are the joins?
> 2) How fast do they need to go?
> 3) How many queries need to run concurrently?
> 
> #1 and 2# will dictate how many shards, replicas and parallel workers are
> needed to perform the join. #3 needs to be carefully considered because
> MapReduce distributed joins are not going to scale like traditional Solr
> queries.





--
View this message in context: http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288194.html
Sent from the Solr - User mailing list archive at Nabble.com.

Re: Specify sorting of merged streams

Posted by Joel Bernstein <jo...@gmail.com>.
A few other things for you to consider:

1) How big are the joins?
2) How fast do they need to go?
3) How many queries need to run concurrently?

#1 and 2# will dictate how many shards, replicas and parallel workers are
needed to perform the join. #3 needs to be carefully considered because
MapReduce distributed joins are not going to scale like traditional Solr
queries.







Joel Bernstein
http://joelsolr.blogspot.com/

On Wed, Jul 20, 2016 at 11:29 PM, Joel Bernstein <jo...@gmail.com> wrote:

> One of the things to consider is using a hashJoin on first and second
> joins. If you have one large table and two smaller tables the hashJoin
> makes a lot of sense.
>
> One possible flow would be:
>
> parallel reduce to do the grouping
> hashJoin to the second table
> hashJoin to the third table
>
> The hashJoins can be done in parallel if the partitionKeys are the same as
> the partition keys for the reduce.
>
> To resort for the user specified sort you can use the top() function to
> resort the top N results in a priority queue.
>
> The sort() should not be used if you don't have enough memory to sort the
> entire underlying stream.
>
>
>
>
>
>
>
>
>
>
> Joel Bernstein
> http://joelsolr.blogspot.com/
>
> On Wed, Jul 20, 2016 at 10:38 PM, tedsolr <ts...@sciquest.com> wrote:
>
>> I'm hoping I'm just not using the streaming API correctly. I have about
>> 30M
>> docs (~ 15 collections) in production right now that work well with just
>> 4GB
>> of heap (no streaming). I can't believe streaming would choke on my test
>> data.
>>
>> I guess there are 2 primary requirements. Reindexing an entire collection
>> must be done over night (so 8 hrs or so). And search must perform
>> reasonably
>> well with grouped results, stats per group, and the grouping based on a
>> runtime dynamic set of fields. I have a plugin analytics query that does
>> the
>> searching now that is passable (couple secs for 10M results, 30 secs for
>> 40M
>> results), however it doesn't support sharded collections. The reindexing
>> (with atomic updates) takes ~5 hrs for 10M docs, or 20 hrs for 40M docs.
>> The
>> problem is that I will soon have customers with 100M docs so my solution
>> will not scale.
>>
>> To speed up reindexing I'm trying to normalize the data. The key fields
>> that
>> get updated represent 1/30th the number of documents. Placing those fields
>> in a separate collection (unique values only) would allow reindexing to
>> finish much quicker. Search results must include data from both
>> collections
>> (there could be a 3rd collection too but I'm trying to simplify here).
>> Streaming requires like sorts for grouping and merging, and these sorts
>> are
>> different. Then there's the user's sort preference to consider. So what is
>> the right way to stream (for example) grouped results (on User, Date,
>> Vendor, Manufacturer) with merged Vendor data, then apply a final sort on
>> Manufacturer?
>>
>>
>>
>>
>> Joel Bernstein wrote
>> > It's likely that the SortStream is the issue. With the sort function you
>> > need enough memory to sort all the tuples coming from the underlying
>> > stream. The sort stream can also be done in parallel so you can split
>> the
>> > tuples from col1 across N worker nodes. This will give you faster
>> sorting
>> > and apply more memory to the sort.
>> >
>> > Can you describe your exact use case? Perhaps we can think about a
>> > different Streaming flow that would work for you.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288116.html
>> Sent from the Solr - User mailing list archive at Nabble.com.
>>
>
>

Re: Specify sorting of merged streams

Posted by Joel Bernstein <jo...@gmail.com>.
One of the things to consider is using a hashJoin on first and second
joins. If you have one large table and two smaller tables the hashJoin
makes a lot of sense.

One possible flow would be:

parallel reduce to do the grouping
hashJoin to the second table
hashJoin to the third table

The hashJoins can be done in parallel if the partitionKeys are the same as
the partition keys for the reduce.

To resort for the user specified sort you can use the top() function to
resort the top N results in a priority queue.

The sort() should not be used if you don't have enough memory to sort the
entire underlying stream.










Joel Bernstein
http://joelsolr.blogspot.com/

On Wed, Jul 20, 2016 at 10:38 PM, tedsolr <ts...@sciquest.com> wrote:

> I'm hoping I'm just not using the streaming API correctly. I have about 30M
> docs (~ 15 collections) in production right now that work well with just
> 4GB
> of heap (no streaming). I can't believe streaming would choke on my test
> data.
>
> I guess there are 2 primary requirements. Reindexing an entire collection
> must be done over night (so 8 hrs or so). And search must perform
> reasonably
> well with grouped results, stats per group, and the grouping based on a
> runtime dynamic set of fields. I have a plugin analytics query that does
> the
> searching now that is passable (couple secs for 10M results, 30 secs for
> 40M
> results), however it doesn't support sharded collections. The reindexing
> (with atomic updates) takes ~5 hrs for 10M docs, or 20 hrs for 40M docs.
> The
> problem is that I will soon have customers with 100M docs so my solution
> will not scale.
>
> To speed up reindexing I'm trying to normalize the data. The key fields
> that
> get updated represent 1/30th the number of documents. Placing those fields
> in a separate collection (unique values only) would allow reindexing to
> finish much quicker. Search results must include data from both collections
> (there could be a 3rd collection too but I'm trying to simplify here).
> Streaming requires like sorts for grouping and merging, and these sorts are
> different. Then there's the user's sort preference to consider. So what is
> the right way to stream (for example) grouped results (on User, Date,
> Vendor, Manufacturer) with merged Vendor data, then apply a final sort on
> Manufacturer?
>
>
>
>
> Joel Bernstein wrote
> > It's likely that the SortStream is the issue. With the sort function you
> > need enough memory to sort all the tuples coming from the underlying
> > stream. The sort stream can also be done in parallel so you can split the
> > tuples from col1 across N worker nodes. This will give you faster sorting
> > and apply more memory to the sort.
> >
> > Can you describe your exact use case? Perhaps we can think about a
> > different Streaming flow that would work for you.
>
>
>
>
>
> --
> View this message in context:
> http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288116.html
> Sent from the Solr - User mailing list archive at Nabble.com.
>

Re: Specify sorting of merged streams

Posted by tedsolr <ts...@sciquest.com>.
I'm hoping I'm just not using the streaming API correctly. I have about 30M
docs (~ 15 collections) in production right now that work well with just 4GB
of heap (no streaming). I can't believe streaming would choke on my test
data.

I guess there are 2 primary requirements. Reindexing an entire collection
must be done over night (so 8 hrs or so). And search must perform reasonably
well with grouped results, stats per group, and the grouping based on a
runtime dynamic set of fields. I have a plugin analytics query that does the
searching now that is passable (couple secs for 10M results, 30 secs for 40M
results), however it doesn't support sharded collections. The reindexing
(with atomic updates) takes ~5 hrs for 10M docs, or 20 hrs for 40M docs. The
problem is that I will soon have customers with 100M docs so my solution
will not scale.

To speed up reindexing I'm trying to normalize the data. The key fields that
get updated represent 1/30th the number of documents. Placing those fields
in a separate collection (unique values only) would allow reindexing to
finish much quicker. Search results must include data from both collections
(there could be a 3rd collection too but I'm trying to simplify here).
Streaming requires like sorts for grouping and merging, and these sorts are
different. Then there's the user's sort preference to consider. So what is
the right way to stream (for example) grouped results (on User, Date,
Vendor, Manufacturer) with merged Vendor data, then apply a final sort on
Manufacturer?




Joel Bernstein wrote
> It's likely that the SortStream is the issue. With the sort function you
> need enough memory to sort all the tuples coming from the underlying
> stream. The sort stream can also be done in parallel so you can split the
> tuples from col1 across N worker nodes. This will give you faster sorting
> and apply more memory to the sort.
> 
> Can you describe your exact use case? Perhaps we can think about a
> different Streaming flow that would work for you.





--
View this message in context: http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288116.html
Sent from the Solr - User mailing list archive at Nabble.com.

Re: Specify sorting of merged streams

Posted by Joel Bernstein <jo...@gmail.com>.
It's likely that the SortStream is the issue. With the sort function you
need enough memory to sort all the tuples coming from the underlying
stream. The sort stream can also be done in parallel so you can split the
tuples from col1 across N worker nodes. This will give you faster sorting
and apply more memory to the sort.

Can you describe your exact use case? Perhaps we can think about a
different Streaming flow that would work for you.














Joel Bernstein
http://joelsolr.blogspot.com/

On Wed, Jul 20, 2016 at 5:29 PM, tedsolr <ts...@sciquest.com> wrote:

> I am getting an OOM error trying to combine streaming operations. I think
> the
> sort is the issue. This test was done on a single replica cloud setup of
> v6.1 with 4GB heap. col1 has 1M docs. col2 has 10k docs. The search for
> each
> collection was q=*:*. Using SolrJ:
>
> CloudSolrStream searchStream = new CloudSolrStream(zkHosts, "col1", map);
> //
> base search from user
> ReducerStream reducer = new ReducerStream(searchStream, comparator,
> reducer); //  this groups docs by a field list
> SortStream idSort = new SortStream(reducer, new FieldComparator("id",
> ComparatorOrder.ASCENDING)); // this resorts the results so I can compare
> them against data from another collection
> CloudSolrStream ruleStream = new CloudSolrStream(zkHosts, "col2", map);
> TupleStream stream = new IntersectStream(idSort, ruleStream, new
> FieldEqualitor("id")); // this provides a filter
>
> When I open the stream it works for about 30 seconds then dies. So a single
> user search on a very small collection (1M docs) can overwhelm a 4GB heap.
> And this chain isn't even done! I still need to merge with yet a third
> collection and then resort with the user's specified sort parameter.
>
> Is there something fundamentally wrong with my approach?
> thanks, Ted
>
>
>
> Joel Bernstein wrote
> > Hi,
> >
> > The streaming API in Solr 6x has been expanded to supported many
> different
> > parallel computing workloads. For example the topic stream supports
> > pub/sub
> > messaging. The gatherNodes stream supports graph traversal. The facet
> > stream supports aggregations inside the search engine, while the rollup
> > stream supports shuffling map / reduce aggregations. Stored queries and
> > large scale alerting is on the way...
> >
> > The sort stream is designed to be used at scale in parallel mode. It can
> > currently sort about 1,000,000 docs per second on a single worker. So if
> > you have 20 workers it can sort 20,000,000 docs per second. The plan is
> to
> > eventually switch to the fork/join merge sort so that you get parallelism
> > within the same worker.
> >
> >
> >
> > Joel Bernstein
> > http://joelsolr.blogspot.com/
>
>
>
>
>
> --
> View this message in context:
> http://lucene.472066.n3.nabble.com/Specify-sorting-of-merged-streams-tp4285026p4288083.html
> Sent from the Solr - User mailing list archive at Nabble.com.
>