You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@phoenix.apache.org by Alex Warshavsky <aw...@salesforce.com> on 2015/09/03 20:05:50 UTC

Re: Apache Pig Integration with Phoenix

Hello all,

Expanding earlier email conversation to wider audience to share the wealth.
Please read email chain below to get the context if needed.

Followup question. Here's a simple PIG script that loads data from Phoenix,
performs aggregation, and writes aggregations back to Phoenix. Idea is to
rollup counts every hour, hence the WHERE clause by EVENT_TIME.

raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'


by_org_and_app = GROUP raw BY TENANT_ID, USER_ID


count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
USER_ID, COUNT(raw)


STORE count_by_user into 'hbase://METRICS’

For script above, since GROUP follows the LOAD in PIG, is the best way to
push GROUP filter to the SELECT WHERE clause? I want to ensure that the
subsequent rollups are correct for each TENANT_ID/USER_ID without loading
the whole table.


raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' *AND TENANT_ID =
'123456789'*


by_org_and_app = GROUP raw BY TENANT_ID, USER_ID

The flip side is that the idea was to use PIG to Map/Reduce aggregations
(rollups) across tenants rather than processing one TENANT at a time. If
tenants have to processed one at a time, a loop will have to be created
which I was trying to avoid.

Thanks,
--Alex.

On Thu, Aug 27, 2015 at 7:44 PM, Ravi Kiran wrote:

> Hi Alex,
>    I believe the way Pig works is to apply predicate pushdown to LOAD and
> once the data is loaded , applies the operators like GROUP and COUNT.
> Hence, the rollup in your example above would happen as a subsequent step.
>    Currently, the PhoenixHBaseLoader doesn't implement
> LoadPredicatePushdown hence, any FILTER operators after LOAD doesn't get
> pushed to HBase. In these cases, its good to go with the SELECT query
> approach and have the WHERE clause to get the predicate pushdown feature.
>
> I also would recommend running the EXPLAIN operator on the script to see
> the execution plan.
>
> Sorry for the delay in my response, just got back from office.
>
> Regards
> Ravi
>
> On Thu, Aug 27, 2015 at 12:54 PM, Alex Warshavsky wrote:
>
>> Hi Ravi,
>>
>> Thanks a lot for your response. I wanted to clarify relationship between
>> splits and PIG operations like GROUP, COUNT, and DISTINCT.
>>
>> Let's talk about an example.
>>
>> In the example below, GROUPing happens to be by TENANT_ID and USER_ID.
>> When LOAD happens, does the split take care of the rollup being calculated
>> for all rows for a given TENANT_ID? Is the data read in chunks (by
>> split logic) and then rolled up using map/reduce for each of the groups?
>> What about cases like DISTINCT (count(distinct) equivalent) where *all*
>> of the data for a given group needs to be available to perform final
>> calculation.
>>
>> Effectively on Map/Reduce level we're talking about TENANT_ID and USER_ID being
>> the keys and Reduce happening for each key (COUNT).
>>
>> # Calculate number of logins per user for a given day. Rollup by
>> TENANT_ID.
>>
>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>
>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>
>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>> USER_ID, COUNT(raw)
>>
>> STORE count_by_user into 'hbase://METRICS’
>>
>> Cheers,
>> --Alex.
>>
>> On Wed, Aug 26, 2015 at 7:42 PM, Ravi Kiran wrote:
>>
>>> Hi Soumen,
>>>
>>>    Below are the series of steps that happens for the given LOAD operator
>>>
>>> a) Parse the information passed to the LOAD operator and generate a
>>> SELECT query with the necessary columns requested. You can also pass a
>>> SELECT query directly.
>>> b) From the SELECT query, we generate the QueryPlan and thereby get the
>>> splits.  This is all done in the PhoenixInputFormat.
>>> c) Each split pulls records from the table. Kindly note the splits dont
>>> map to region boundaries . They just have a start and stop keys.
>>> d) We then transform the returned columns onto Pig datatype . You can
>>> look further into TypeUtil .
>>>
>>> Coming to you questions,
>>>
>>>
>>>    - Could you suggest how the data is being loaded into Pig from
>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>    from HBase to MapReduce hdfs during the Load.
>>>    *RAVI*: No data movement happens from Hbase to HDFS. Its a direct
>>>    SCAN from HBase table.
>>>
>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>    in LOAD function similar to STORE batchSize?
>>>    *RAVI*:  Yes. 1 chunk is 1 split.  I am not sure on the use of
>>>    batchSize as PIG cannot execute any downstream operators like GROUP , JOIN
>>>    till all data is loaded. Please feel free to revert if I am not on the same
>>>    page as you on this question.
>>>
>>>    - Can the Load be reused for multi-query on Phoenix? We are looking
>>>    to process the same load data using multiple Group+Store
>>>    *RAVI*:   I am hoping you are looking into using Multi-query
>>>    optimization of Pig. https://issues.apache.org/jira/browse/PIG-627
>>>    .  I would definitely recommend using that if you have multiple STORE.
>>>
>>> Please feel free to reach out to me if you need any information. I am
>>> HTH.
>>>
>>> Regards
>>> Ravi
>>>
>>> On Wed, Aug 26, 2015 at 4:31 PM, Soumen Bandyopadhyay wrote:
>>>
>>>> Hello Ravi,
>>>>
>>>> We are developers at Salesforce looking into the Pig integration
>>>> <https://phoenix.apache.org/pig_integration.html> with Phoenix. We are
>>>> looking into the LOAD function and have some questions around its
>>>> implementation.
>>>>
>>>>
>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>    from HBase to MapReduce hdfs during the Load.
>>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>>    in LOAD function similar to STORE batchSize?
>>>>    - Can the Load be reused for multi-query on Phoenix? We are looking
>>>>    to process the same load data using multiple Group+Store.
>>>>
>>>> Thank you for your input in this regard.
>>>>
>>>> --
>>>> Thanks,
>>>>
>>>> Soumen
>>>>
>>>
>>>
>>
>

Re: Apache Pig Integration with Phoenix

Posted by Alex Warshavsky <aw...@salesforce.com>.
James,

Is async upsert/select query server route is an option now? Or is that
something that will have to be developed over time?

Cheers,
--Alex.

On Fri, Sep 4, 2015 at 10:44 PM, James Taylor <ja...@apache.org>
wrote:

> Hi Alex,
> You can execute that pig script in a single UPSERT SELECT call in Phoenix
> and it'll perform far better. What's the motivation for using Pig? If the
> answer is "I don't want to tie up client threads for that amount of time",
> then one answer to that is to use our query server[1]. That'd still use one
> client thread, but with an UPSERT SELECT you'd be ok running it
> asynchronously (please file a JIRA for this if you're interested in
> pursuing it).
>
> FYI, our Pig integration does not support any aggregation in the LOAD
> command as Phoenix expects to do the final merge. It'd be interesting to
> explore being able to invoke the server-side aggregation and have the
> client-side final merge be done by Pig. I believe Pig exposes an API for
> that, but Phoenix doesn't have an API for a partial aggregation. We're
> looking into integrating with Apache Drill and I suspect this kind of thing
> will be possible through our Calcite integration.
>
> Thanks,
> James
>
> [1] https://phoenix.apache.org/server.html
>
> On Thu, Sep 3, 2015 at 11:05 AM, Alex Warshavsky <
> awarshavsky@salesforce.com> wrote:
>
>> Hello all,
>>
>> Expanding earlier email conversation to wider audience to share the
>> wealth. Please read email chain below to get the context if needed.
>>
>> Followup question. Here's a simple PIG script that loads data from
>> Phoenix, performs aggregation, and writes aggregations back to Phoenix.
>> Idea is to rollup counts every hour, hence the WHERE clause by EVENT_TIME.
>>
>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>
>>
>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>
>>
>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>> USER_ID, COUNT(raw)
>>
>>
>> STORE count_by_user into 'hbase://METRICS’
>>
>> For script above, since GROUP follows the LOAD in PIG, is the best way to
>> push GROUP filter to the SELECT WHERE clause? I want to ensure that the
>> subsequent rollups are correct for each TENANT_ID/USER_ID without loading
>> the whole table.
>>
>>
>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' *AND TENANT_ID =
>> '123456789'*
>>
>>
>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>
>> The flip side is that the idea was to use PIG to Map/Reduce aggregations
>> (rollups) across tenants rather than processing one TENANT at a time. If
>> tenants have to processed one at a time, a loop will have to be created
>> which I was trying to avoid.
>>
>> Thanks,
>> --Alex.
>>
>> On Thu, Aug 27, 2015 at 7:44 PM, Ravi Kiran wrote:
>>
>>> Hi Alex,
>>>    I believe the way Pig works is to apply predicate pushdown to LOAD
>>> and once the data is loaded , applies the operators like GROUP and COUNT.
>>> Hence, the rollup in your example above would happen as a subsequent step.
>>>    Currently, the PhoenixHBaseLoader doesn't implement
>>> LoadPredicatePushdown hence, any FILTER operators after LOAD doesn't get
>>> pushed to HBase. In these cases, its good to go with the SELECT query
>>> approach and have the WHERE clause to get the predicate pushdown feature.
>>>
>>> I also would recommend running the EXPLAIN operator on the script to see
>>> the execution plan.
>>>
>>> Sorry for the delay in my response, just got back from office.
>>>
>>> Regards
>>> Ravi
>>>
>>> On Thu, Aug 27, 2015 at 12:54 PM, Alex Warshavsky wrote:
>>>
>>>> Hi Ravi,
>>>>
>>>> Thanks a lot for your response. I wanted to clarify relationship
>>>> between splits and PIG operations like GROUP, COUNT, and DISTINCT.
>>>>
>>>> Let's talk about an example.
>>>>
>>>> In the example below, GROUPing happens to be by TENANT_ID and USER_ID.
>>>> When LOAD happens, does the split take care of the rollup being calculated
>>>> for all rows for a given TENANT_ID? Is the data read in chunks (by
>>>> split logic) and then rolled up using map/reduce for each of the groups?
>>>> What about cases like DISTINCT (count(distinct) equivalent) where
>>>> *all* of the data for a given group needs to be available to perform final
>>>> calculation.
>>>>
>>>> Effectively on Map/Reduce level we're talking about TENANT_ID and
>>>> USER_ID being the keys and Reduce happening for each key (COUNT).
>>>>
>>>> # Calculate number of logins per user for a given day. Rollup by
>>>> TENANT_ID.
>>>>
>>>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>>>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>>>
>>>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>>>
>>>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>>>> USER_ID, COUNT(raw)
>>>>
>>>> STORE count_by_user into 'hbase://METRICS’
>>>>
>>>> Cheers,
>>>> --Alex.
>>>>
>>>> On Wed, Aug 26, 2015 at 7:42 PM, Ravi Kiran wrote:
>>>>
>>>>> Hi Soumen,
>>>>>
>>>>>    Below are the series of steps that happens for the given LOAD
>>>>> operator
>>>>>
>>>>> a) Parse the information passed to the LOAD operator and generate a
>>>>> SELECT query with the necessary columns requested. You can also pass a
>>>>> SELECT query directly.
>>>>> b) From the SELECT query, we generate the QueryPlan and thereby get
>>>>> the splits.  This is all done in the PhoenixInputFormat.
>>>>> c) Each split pulls records from the table. Kindly note the splits
>>>>> dont map to region boundaries . They just have a start and stop keys.
>>>>> d) We then transform the returned columns onto Pig datatype . You can
>>>>> look further into TypeUtil .
>>>>>
>>>>> Coming to you questions,
>>>>>
>>>>>
>>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>>    from HBase to MapReduce hdfs during the Load.
>>>>>    *RAVI*: No data movement happens from Hbase to HDFS. Its a direct
>>>>>    SCAN from HBase table.
>>>>>
>>>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>>>    in LOAD function similar to STORE batchSize?
>>>>>    *RAVI*:  Yes. 1 chunk is 1 split.  I am not sure on the use of
>>>>>    batchSize as PIG cannot execute any downstream operators like GROUP , JOIN
>>>>>    till all data is loaded. Please feel free to revert if I am not on the same
>>>>>    page as you on this question.
>>>>>
>>>>>    - Can the Load be reused for multi-query on Phoenix? We are
>>>>>    looking to process the same load data using multiple Group+Store
>>>>>    *RAVI*:   I am hoping you are looking into using Multi-query
>>>>>    optimization of Pig. https://issues.apache.org/jira/browse/PIG-627
>>>>>    .  I would definitely recommend using that if you have multiple STORE.
>>>>>
>>>>> Please feel free to reach out to me if you need any information. I am
>>>>> HTH.
>>>>>
>>>>> Regards
>>>>> Ravi
>>>>>
>>>>> On Wed, Aug 26, 2015 at 4:31 PM, Soumen Bandyopadhyay wrote:
>>>>>
>>>>>> Hello Ravi,
>>>>>>
>>>>>> We are developers at Salesforce looking into the Pig integration
>>>>>> <https://phoenix.apache.org/pig_integration.html> with Phoenix. We
>>>>>> are looking into the LOAD function and have some questions around its
>>>>>> implementation.
>>>>>>
>>>>>>
>>>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>>>    from HBase to MapReduce hdfs during the Load.
>>>>>>    - Is the data processed in chunks? Is there a concept of
>>>>>>    batchSize in LOAD function similar to STORE batchSize?
>>>>>>    - Can the Load be reused for multi-query on Phoenix? We are
>>>>>>    looking to process the same load data using multiple Group+Store.
>>>>>>
>>>>>> Thank you for your input in this regard.
>>>>>>
>>>>>> --
>>>>>> Thanks,
>>>>>>
>>>>>> Soumen
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Apache Pig Integration with Phoenix

Posted by James Taylor <ja...@apache.org>.
Hi Alex,
You can execute that pig script in a single UPSERT SELECT call in Phoenix
and it'll perform far better. What's the motivation for using Pig? If the
answer is "I don't want to tie up client threads for that amount of time",
then one answer to that is to use our query server[1]. That'd still use one
client thread, but with an UPSERT SELECT you'd be ok running it
asynchronously (please file a JIRA for this if you're interested in
pursuing it).

FYI, our Pig integration does not support any aggregation in the LOAD
command as Phoenix expects to do the final merge. It'd be interesting to
explore being able to invoke the server-side aggregation and have the
client-side final merge be done by Pig. I believe Pig exposes an API for
that, but Phoenix doesn't have an API for a partial aggregation. We're
looking into integrating with Apache Drill and I suspect this kind of thing
will be possible through our Calcite integration.

Thanks,
James

[1] https://phoenix.apache.org/server.html

On Thu, Sep 3, 2015 at 11:05 AM, Alex Warshavsky <awarshavsky@salesforce.com
> wrote:

> Hello all,
>
> Expanding earlier email conversation to wider audience to share the
> wealth. Please read email chain below to get the context if needed.
>
> Followup question. Here's a simple PIG script that loads data from
> Phoenix, performs aggregation, and writes aggregations back to Phoenix.
> Idea is to rollup counts every hour, hence the WHERE clause by EVENT_TIME.
>
> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>
>
> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>
>
> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
> USER_ID, COUNT(raw)
>
>
> STORE count_by_user into 'hbase://METRICS’
>
> For script above, since GROUP follows the LOAD in PIG, is the best way to
> push GROUP filter to the SELECT WHERE clause? I want to ensure that the
> subsequent rollups are correct for each TENANT_ID/USER_ID without loading
> the whole table.
>
>
> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015' *AND TENANT_ID =
> '123456789'*
>
>
> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>
> The flip side is that the idea was to use PIG to Map/Reduce aggregations
> (rollups) across tenants rather than processing one TENANT at a time. If
> tenants have to processed one at a time, a loop will have to be created
> which I was trying to avoid.
>
> Thanks,
> --Alex.
>
> On Thu, Aug 27, 2015 at 7:44 PM, Ravi Kiran wrote:
>
>> Hi Alex,
>>    I believe the way Pig works is to apply predicate pushdown to LOAD and
>> once the data is loaded , applies the operators like GROUP and COUNT.
>> Hence, the rollup in your example above would happen as a subsequent step.
>>    Currently, the PhoenixHBaseLoader doesn't implement
>> LoadPredicatePushdown hence, any FILTER operators after LOAD doesn't get
>> pushed to HBase. In these cases, its good to go with the SELECT query
>> approach and have the WHERE clause to get the predicate pushdown feature.
>>
>> I also would recommend running the EXPLAIN operator on the script to see
>> the execution plan.
>>
>> Sorry for the delay in my response, just got back from office.
>>
>> Regards
>> Ravi
>>
>> On Thu, Aug 27, 2015 at 12:54 PM, Alex Warshavsky wrote:
>>
>>> Hi Ravi,
>>>
>>> Thanks a lot for your response. I wanted to clarify relationship between
>>> splits and PIG operations like GROUP, COUNT, and DISTINCT.
>>>
>>> Let's talk about an example.
>>>
>>> In the example below, GROUPing happens to be by TENANT_ID and USER_ID.
>>> When LOAD happens, does the split take care of the rollup being calculated
>>> for all rows for a given TENANT_ID? Is the data read in chunks (by
>>> split logic) and then rolled up using map/reduce for each of the groups?
>>> What about cases like DISTINCT (count(distinct) equivalent) where *all*
>>> of the data for a given group needs to be available to perform final
>>> calculation.
>>>
>>> Effectively on Map/Reduce level we're talking about TENANT_ID and
>>> USER_ID being the keys and Reduce happening for each key (COUNT).
>>>
>>> # Calculate number of logins per user for a given day. Rollup by
>>> TENANT_ID.
>>>
>>> raw = LOAD 'hbase://query/SELECT TENANT_ID, USER_ID FROM LOGINS WHERE
>>> EVENT_TIME > 8/1/2015 AND EVENT_TIME < 8/2/2015'
>>>
>>> by_org_and_app = GROUP raw BY TENANT_ID, USER_ID
>>>
>>> count_by_user = FOREACH by_org_and_app GENERATE group AS TENANT_ID,
>>> USER_ID, COUNT(raw)
>>>
>>> STORE count_by_user into 'hbase://METRICS’
>>>
>>> Cheers,
>>> --Alex.
>>>
>>> On Wed, Aug 26, 2015 at 7:42 PM, Ravi Kiran wrote:
>>>
>>>> Hi Soumen,
>>>>
>>>>    Below are the series of steps that happens for the given LOAD
>>>> operator
>>>>
>>>> a) Parse the information passed to the LOAD operator and generate a
>>>> SELECT query with the necessary columns requested. You can also pass a
>>>> SELECT query directly.
>>>> b) From the SELECT query, we generate the QueryPlan and thereby get the
>>>> splits.  This is all done in the PhoenixInputFormat.
>>>> c) Each split pulls records from the table. Kindly note the splits dont
>>>> map to region boundaries . They just have a start and stop keys.
>>>> d) We then transform the returned columns onto Pig datatype . You can
>>>> look further into TypeUtil .
>>>>
>>>> Coming to you questions,
>>>>
>>>>
>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>    from HBase to MapReduce hdfs during the Load.
>>>>    *RAVI*: No data movement happens from Hbase to HDFS. Its a direct
>>>>    SCAN from HBase table.
>>>>
>>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>>    in LOAD function similar to STORE batchSize?
>>>>    *RAVI*:  Yes. 1 chunk is 1 split.  I am not sure on the use of
>>>>    batchSize as PIG cannot execute any downstream operators like GROUP , JOIN
>>>>    till all data is loaded. Please feel free to revert if I am not on the same
>>>>    page as you on this question.
>>>>
>>>>    - Can the Load be reused for multi-query on Phoenix? We are looking
>>>>    to process the same load data using multiple Group+Store
>>>>    *RAVI*:   I am hoping you are looking into using Multi-query
>>>>    optimization of Pig. https://issues.apache.org/jira/browse/PIG-627
>>>>    .  I would definitely recommend using that if you have multiple STORE.
>>>>
>>>> Please feel free to reach out to me if you need any information. I am
>>>> HTH.
>>>>
>>>> Regards
>>>> Ravi
>>>>
>>>> On Wed, Aug 26, 2015 at 4:31 PM, Soumen Bandyopadhyay wrote:
>>>>
>>>>> Hello Ravi,
>>>>>
>>>>> We are developers at Salesforce looking into the Pig integration
>>>>> <https://phoenix.apache.org/pig_integration.html> with Phoenix. We
>>>>> are looking into the LOAD function and have some questions around its
>>>>> implementation.
>>>>>
>>>>>
>>>>>    - Could you suggest how the data is being loaded into Pig from
>>>>>    Phoenix/HBase? We are trying to evaluate whether there is data movement
>>>>>    from HBase to MapReduce hdfs during the Load.
>>>>>    - Is the data processed in chunks? Is there a concept of batchSize
>>>>>    in LOAD function similar to STORE batchSize?
>>>>>    - Can the Load be reused for multi-query on Phoenix? We are
>>>>>    looking to process the same load data using multiple Group+Store.
>>>>>
>>>>> Thank you for your input in this regard.
>>>>>
>>>>> --
>>>>> Thanks,
>>>>>
>>>>> Soumen
>>>>>
>>>>
>>>>
>>>
>>
>