You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hbase.apache.org by "Sudarshan Kadambi (BLOOMBERG/ 731 LEXIN)" <sk...@bloomberg.net> on 2013/04/25 23:44:08 UTC

Coprocessors

Folks:

This is my first post on the HBase user mailing list. 

I have the following scenario:
I've a HBase table of upto a billion keys. I'm looking to support an application where on some user action, I'd need to fetch multiple columns for upto 250K keys and do some sort of aggregation on it. Fetching all that data and doing the aggregation in my application takes about a minute.

I'm looking to co-locate the aggregation logic with the region servers to
a. Distribute the aggregation
b. Avoid having to fetch large amounts of data over the network (this could potentially be cross-datacenter)

Neither observers nor aggregation endpoints work for this use case. Observers don't return data back to the client while aggregation endpoints work in the context of scans not a multi-get (Are these correct assumptions?).

I'm looking to write a service that runs alongside the region servers and acts a proxy b/w my application and the region servers. 

I plan to use the logic in HBase client's HConnectionManager, to segment my request of 1M rowkeys into sub-requests per region-server. These are sent over to the proxy which fetches the data from the region server, aggregates locally and sends data back. Does this sound reasonable or even a useful thing to pursue?

Regards,
-sudarshan

Re: Coprocessors

Posted by Viral Bajaria <vi...@gmail.com>.
Phoenix might be able to solve the problem if the keys are structured in
the binary format that it understand or else you are better off reloading
that data in a table created via Phoenix. But I will let James tackle this
question.

Regarding your use-case, why can't you do the aggregation using observers ?
You should be able to do the aggregation and return a new Scanner to your
client.

And Lars is right about the range scans that Phoenix does. It does restrict
things and also will do parallel scans for you based on what you
select/filter.

-Viral


On Thu, Apr 25, 2013 at 3:12 PM, Michael Segel <mi...@hotmail.com>wrote:

> I don't think Phoenix will solve his problem.
>
> He also needs to explain more about his problem before we can start to
> think about the problem.
>
>
> On Apr 25, 2013, at 4:54 PM, lars hofhansl <la...@apache.org> wrote:
>
> > You might want to have a look at Phoenix (
> https://github.com/forcedotcom/phoenix), which does that and more, and
> gives a SQL/JDBC interface.
> >
> > -- Lars
> >
> >
> >
> > ________________________________
> > From: Sudarshan Kadambi (BLOOMBERG/ 731 LEXIN) <sk...@bloomberg.net>
> > To: user@hbase.apache.org
> > Sent: Thursday, April 25, 2013 2:44 PM
> > Subject: Coprocessors
> >
> >
> > Folks:
> >
> > This is my first post on the HBase user mailing list.
> >
> > I have the following scenario:
> > I've a HBase table of upto a billion keys. I'm looking to support an
> application where on some user action, I'd need to fetch multiple columns
> for upto 250K keys and do some sort of aggregation on it. Fetching all that
> data and doing the aggregation in my application takes about a minute.
> >
> > I'm looking to co-locate the aggregation logic with the region servers to
> > a. Distribute the aggregation
> > b. Avoid having to fetch large amounts of data over the network (this
> could potentially be cross-datacenter)
> >
> > Neither observers nor aggregation endpoints work for this use case.
> Observers don't return data back to the client while aggregation endpoints
> work in the context of scans not a multi-get (Are these correct
> assumptions?).
> >
> > I'm looking to write a service that runs alongside the region servers
> and acts a proxy b/w my application and the region servers.
> >
> > I plan to use the logic in HBase client's HConnectionManager, to segment
> my request of 1M rowkeys into sub-requests per region-server. These are
> sent over to the proxy which fetches the data from the region server,
> aggregates locally and sends data back. Does this sound reasonable or even
> a useful thing to pursue?
> >
> > Regards,
> > -sudarshan
>
>

Re: Coprocessors

Posted by Michael Segel <mi...@hotmail.com>.
I don't think Phoenix will solve his problem. 

He also needs to explain more about his problem before we can start to think about the problem. 


On Apr 25, 2013, at 4:54 PM, lars hofhansl <la...@apache.org> wrote:

> You might want to have a look at Phoenix (https://github.com/forcedotcom/phoenix), which does that and more, and gives a SQL/JDBC interface.
> 
> -- Lars
> 
> 
> 
> ________________________________
> From: Sudarshan Kadambi (BLOOMBERG/ 731 LEXIN) <sk...@bloomberg.net>
> To: user@hbase.apache.org 
> Sent: Thursday, April 25, 2013 2:44 PM
> Subject: Coprocessors
> 
> 
> Folks:
> 
> This is my first post on the HBase user mailing list. 
> 
> I have the following scenario:
> I've a HBase table of upto a billion keys. I'm looking to support an application where on some user action, I'd need to fetch multiple columns for upto 250K keys and do some sort of aggregation on it. Fetching all that data and doing the aggregation in my application takes about a minute.
> 
> I'm looking to co-locate the aggregation logic with the region servers to
> a. Distribute the aggregation
> b. Avoid having to fetch large amounts of data over the network (this could potentially be cross-datacenter)
> 
> Neither observers nor aggregation endpoints work for this use case. Observers don't return data back to the client while aggregation endpoints work in the context of scans not a multi-get (Are these correct assumptions?).
> 
> I'm looking to write a service that runs alongside the region servers and acts a proxy b/w my application and the region servers. 
> 
> I plan to use the logic in HBase client's HConnectionManager, to segment my request of 1M rowkeys into sub-requests per region-server. These are sent over to the proxy which fetches the data from the region server, aggregates locally and sends data back. Does this sound reasonable or even a useful thing to pursue?
> 
> Regards,
> -sudarshan


Re: Coprocessors

Posted by lars hofhansl <la...@apache.org>.
You might want to have a look at Phoenix (https://github.com/forcedotcom/phoenix), which does that and more, and gives a SQL/JDBC interface.

-- Lars



________________________________
 From: Sudarshan Kadambi (BLOOMBERG/ 731 LEXIN) <sk...@bloomberg.net>
To: user@hbase.apache.org 
Sent: Thursday, April 25, 2013 2:44 PM
Subject: Coprocessors
 

Folks:

This is my first post on the HBase user mailing list. 

I have the following scenario:
I've a HBase table of upto a billion keys. I'm looking to support an application where on some user action, I'd need to fetch multiple columns for upto 250K keys and do some sort of aggregation on it. Fetching all that data and doing the aggregation in my application takes about a minute.

I'm looking to co-locate the aggregation logic with the region servers to
a. Distribute the aggregation
b. Avoid having to fetch large amounts of data over the network (this could potentially be cross-datacenter)

Neither observers nor aggregation endpoints work for this use case. Observers don't return data back to the client while aggregation endpoints work in the context of scans not a multi-get (Are these correct assumptions?).

I'm looking to write a service that runs alongside the region servers and acts a proxy b/w my application and the region servers. 

I plan to use the logic in HBase client's HConnectionManager, to segment my request of 1M rowkeys into sub-requests per region-server. These are sent over to the proxy which fetches the data from the region server, aggregates locally and sends data back. Does this sound reasonable or even a useful thing to pursue?

Regards,
-sudarshan

Re: Coprocessors

Posted by James Taylor <jt...@salesforce.com>.
On 04/25/2013 03:35 PM, Gary Helmling wrote:
>> I'm looking to write a service that runs alongside the region servers and
>> acts a proxy b/w my application and the region servers.
>>
>> I plan to use the logic in HBase client's HConnectionManager, to segment
>> my request of 1M rowkeys into sub-requests per region-server. These are
>> sent over to the proxy which fetches the data from the region server,
>> aggregates locally and sends data back. Does this sound reasonable or even
>> a useful thing to pursue?
>>
>>
> This is essentially what coprocessor endpoints (called through
> HTable.coprocessorExec()) basically do.  (One difference is that there is a
> parallel request per-region, not per-region server, though that is a
> potential optimization that could be made as well).
>
> The tricky part I see for the case you describe is splitting your full set
> of row keys up correctly per region.  You could send the full set of row
> keys to each endpoint invocation, and have the endpoint implementation
> filter down to only those keys present in the current region.  But that
> would be a lot of overhead on the request side.  You could split the row
> keys into per-region sets on the client side, but I'm not sure we provide
> sufficient context for the Batch.Callable instance you provide to
> coprocessorExec() to determine which region it is being invoked against.

Sudarshan,
In our head branch of Phoenix (we're targeting this for a 1.2 release in 
two weeks), we've implemented a skip scan filter that functions similar 
to a batched get, except:
1) it's more flexible in that it can jump not only from a single key to 
another single key, but also from range to range
2) it's faster, about 3-4x.
3) you can use it in combination with aggregation, since it's a filter

The scan is chunked up by region and only the keys in each region are 
sent, along the lines as you and Gary have described. Then the results 
are merged together by the client automatically.

How would you decompose your row key into columns? Is there a time 
component? Let me walk you through an example where you might have a 
LONG id value plus perhaps a timestamp (it work equally well if you only 
had a single column in your PK). If you provide a bit more info on your 
use case, I can tailor it more exactly.

Create a schema:
     CREATE TABLE t (key BIGINT NOT NULL, ts DATE NOT NULL, data VARCHAR 
CONSTRAINT pk PRIMARY KEY (key, ts));

Populate your data using our UPSERT statement.

Aggregate over a set of keys like this:

     SELECT count(*) FROM t WHERE key IN (?,?,?) AND ts > ? AND ts < ?

where you bind the ? at runtime (probably building the statement 
programmatically based on how many keys you're binding.

Then Phoenix would jump around the key space of your table using the 
skip next hint feature provided by filters. You'd just use the regular 
JDBC ResultSet to get your count back.

If you want more info and/or a benchmark of seeking over 250K keys in a 
billion row table, let me know.

Thanks,

James

Re: Coprocessors

Posted by Gary Helmling <gh...@gmail.com>.
> I'm looking to write a service that runs alongside the region servers and
> acts a proxy b/w my application and the region servers.
>
> I plan to use the logic in HBase client's HConnectionManager, to segment
> my request of 1M rowkeys into sub-requests per region-server. These are
> sent over to the proxy which fetches the data from the region server,
> aggregates locally and sends data back. Does this sound reasonable or even
> a useful thing to pursue?
>
>
This is essentially what coprocessor endpoints (called through
HTable.coprocessorExec()) basically do.  (One difference is that there is a
parallel request per-region, not per-region server, though that is a
potential optimization that could be made as well).

The tricky part I see for the case you describe is splitting your full set
of row keys up correctly per region.  You could send the full set of row
keys to each endpoint invocation, and have the endpoint implementation
filter down to only those keys present in the current region.  But that
would be a lot of overhead on the request side.  You could split the row
keys into per-region sets on the client side, but I'm not sure we provide
sufficient context for the Batch.Callable instance you provide to
coprocessorExec() to determine which region it is being invoked against.