You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Goutham Miryala <go...@chartboost.com> on 2023/11/02 19:52:54 UTC

Fwd: Count(distinct) not working in beam sql

Hey Team,

We're trying to implement an aggregation which involves *several trillions
of rows *using apache beam sql.
However I'm getting an exception
Exception in thread "main" java.lang.UnsupportedOperationException: Does
not support COUNT DISTINCT

Here's the code for doing the aggregation:

PCollection<Row> aggregate = joinedCollection.apply("Aggregation",
        SqlTransform.query("SELECT" +
                "        exchange_name as adexchange," +
                "        strategy," +
                "        platform," +
                "        segment," +
                "        auction_type," +
                "        placement_type," +
                "        country," +
                "        COALESCE(loss, 0) AS loss_code," +
                "        COUNT(DISTINCT identifier) AS uniques," +
                "        no_bid_reason," +
                "        SUM(1) AS auctions," +
                "        SUM(CASE WHEN cpm_bid > 0 THEN 1 ELSE 0 END)
AS bids," +
                "        SUM(cpm_bid) AS total_bid_price," +
                "        SUM(CASE WHEN loss = 0 THEN 1 END) AS wins," +
                "        app_bundle AS app_bundle," +
                "        model_id AS model_id," +
                "        identifier_type AS identifier_type," +
                "        promotion_id AS promotion_id," +
                "        sub_floor_bid_min_price_cohort AS
sub_floor_bid_min_price_cohort," +
                "        bf_match_experiment AS bf_match_experiment," +
                "        bep_matched_floor AS bep_matched_floor," +
                "        SUM(p_ctr) AS p_ctr_total," +
                "        SUM(p_ir) AS p_ir_total," +
                "        SUM(p_cpa) AS p_cpa_total," +
                "        SUM(arppu) AS arppu_total," +
                "        SUM(spend) AS spend_total," +
                "        SUM(cpm_price) AS cpm_price_total" +
                "    FROM" +
                "        PCOLLECTION" +
                "    GROUP BY
exchange_name,strategy,platform,segment,auction_type" +
                ",placement_type,country,loss,no_bid_reason,app_bundle" +

",model_id,identifier_type,promotion_id,sub_floor_bid_min_price_cohort"
+
                ",bf_match_experiment,bep_matched_floor")
);


Can you please guide us?

Let me know in case you need any more information.

Goutham Miryala
Senior Data Engineer

<http://chartboost.com/>

Re: Count(distinct) not working in beam sql

Posted by Talat Uyarer via user <us...@beam.apache.org>.
Hi,

I saw this a little bit late. I implement a custom count distinct for our
streaming use case. If you are looking for something close enough but not
exact you can use my UDF. It uses the HyperLogLogPlus algorithm, which is
an efficient and scalable way to estimate cardinality with a controlled
level of accuracy.

I put on Github Gist
https://gist.github.com/talatuyarer/eb3a3796dcc93ede8f99d61961934a34

On Fri, Nov 3, 2023 at 8:02 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Unfortunatelly, Beam SQL doesn’t support COUNT(DISTINCT) aggregation.
>
> More details about “why" is on this discussion [1] and the related open
> issue for that here [2].
>
> —
> Alexey
>
> [1] https://lists.apache.org/thread/hvmy6d5dls3m8xcnf74hfmy1xxfgj2xh
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__lists.apache.org_thread_hvmy6d5dls3m8xcnf74hfmy1xxfgj2xh&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=2MSyFF_R0ueGDPxdCAj3uUyRJ7Oa9bg-L01cdZ0-pLoeM0seeK51ZE9ggarBKYbA&s=Bu8_Vuamp6-yjug5j2S6dj0CI9Jt7Uly59NKPA-Ih64&e=>
> [2] https://github.com/apache/beam/issues/19398
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_beam_issues_19398&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=2MSyFF_R0ueGDPxdCAj3uUyRJ7Oa9bg-L01cdZ0-pLoeM0seeK51ZE9ggarBKYbA&s=UuR5FP8f7qmOSq1zmXzo3BCR8UXThlEB4uAL_Fpej60&e=>
>
>
> On 2 Nov 2023, at 20:52, Goutham Miryala <go...@chartboost.com>
> wrote:
>
> Hey Team,
>
> We're trying to implement an aggregation which involves *several
> trillions of rows *using apache beam sql.
> However I'm getting an exception
> Exception in thread "main" java.lang.UnsupportedOperationException: Does
> not support COUNT DISTINCT
>
> Here's the code for doing the aggregation:
>
> PCollection<Row> aggregate = joinedCollection.apply("Aggregation",
>         SqlTransform.query("SELECT" +
>                 "        exchange_name as adexchange," +
>                 "        strategy," +
>                 "        platform," +
>                 "        segment," +
>                 "        auction_type," +
>                 "        placement_type," +
>                 "        country," +
>                 "        COALESCE(loss, 0) AS loss_code," +
>                 "        COUNT(DISTINCT identifier) AS uniques," +
>                 "        no_bid_reason," +
>                 "        SUM(1) AS auctions," +
>                 "        SUM(CASE WHEN cpm_bid > 0 THEN 1 ELSE 0 END) AS bids," +
>                 "        SUM(cpm_bid) AS total_bid_price," +
>                 "        SUM(CASE WHEN loss = 0 THEN 1 END) AS wins," +
>                 "        app_bundle AS app_bundle," +
>                 "        model_id AS model_id," +
>                 "        identifier_type AS identifier_type," +
>                 "        promotion_id AS promotion_id," +
>                 "        sub_floor_bid_min_price_cohort AS sub_floor_bid_min_price_cohort," +
>                 "        bf_match_experiment AS bf_match_experiment," +
>                 "        bep_matched_floor AS bep_matched_floor," +
>                 "        SUM(p_ctr) AS p_ctr_total," +
>                 "        SUM(p_ir) AS p_ir_total," +
>                 "        SUM(p_cpa) AS p_cpa_total," +
>                 "        SUM(arppu) AS arppu_total," +
>                 "        SUM(spend) AS spend_total," +
>                 "        SUM(cpm_price) AS cpm_price_total" +
>                 "    FROM" +
>                 "        PCOLLECTION" +
>                 "    GROUP BY exchange_name,strategy,platform,segment,auction_type" +
>                 ",placement_type,country,loss,no_bid_reason,app_bundle" +
>                 ",model_id,identifier_type,promotion_id,sub_floor_bid_min_price_cohort" +
>                 ",bf_match_experiment,bep_matched_floor")
> );
>
>
> Can you please guide us?
>
> Let me know in case you need any more information.
>
> Goutham Miryala
> Senior Data Engineer
>
>
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__chartboost.com_&d=DwMFaQ&c=V9IgWpI5PvzTw83UyHGVSoW3Uc1MFWe5J8PTfkrzVSo&r=BkW1L6EF7ergAVYDXCo-3Vwkpy6qjsWAz7_GD7pAR8g&m=2MSyFF_R0ueGDPxdCAj3uUyRJ7Oa9bg-L01cdZ0-pLoeM0seeK51ZE9ggarBKYbA&s=LwjEURwSvVipxJhRnHSUy9td_Ys4goHYAGchoJBXAQ8&e=>
>
>
>

Re: Count(distinct) not working in beam sql

Posted by Alexey Romanenko <ar...@gmail.com>.
Unfortunatelly, Beam SQL doesn’t support COUNT(DISTINCT) aggregation. 

More details about “why" is on this discussion [1] and the related open issue for that here [2].

—
Alexey

[1] https://lists.apache.org/thread/hvmy6d5dls3m8xcnf74hfmy1xxfgj2xh
[2] https://github.com/apache/beam/issues/19398


> On 2 Nov 2023, at 20:52, Goutham Miryala <go...@chartboost.com> wrote:
> 
> Hey Team,
> 
> We're trying to implement an aggregation which involves several trillions of rows using apache beam sql.
> However I'm getting an exception 
> Exception in thread "main" java.lang.UnsupportedOperationException: Does not support COUNT DISTINCT
> 
> Here's the code for doing the aggregation:
> 
> PCollection<Row> aggregate = joinedCollection.apply("Aggregation",
>         SqlTransform.query("SELECT" +
>                 "        exchange_name as adexchange," +
>                 "        strategy," +
>                 "        platform," +
>                 "        segment," +
>                 "        auction_type," +
>                 "        placement_type," +
>                 "        country," +
>                 "        COALESCE(loss, 0) AS loss_code," +
>                 "        COUNT(DISTINCT identifier) AS uniques," +
>                 "        no_bid_reason," +
>                 "        SUM(1) AS auctions," +
>                 "        SUM(CASE WHEN cpm_bid > 0 THEN 1 ELSE 0 END) AS bids," +
>                 "        SUM(cpm_bid) AS total_bid_price," +
>                 "        SUM(CASE WHEN loss = 0 THEN 1 END) AS wins," +
>                 "        app_bundle AS app_bundle," +
>                 "        model_id AS model_id," +
>                 "        identifier_type AS identifier_type," +
>                 "        promotion_id AS promotion_id," +
>                 "        sub_floor_bid_min_price_cohort AS sub_floor_bid_min_price_cohort," +
>                 "        bf_match_experiment AS bf_match_experiment," +
>                 "        bep_matched_floor AS bep_matched_floor," +
>                 "        SUM(p_ctr) AS p_ctr_total," +
>                 "        SUM(p_ir) AS p_ir_total," +
>                 "        SUM(p_cpa) AS p_cpa_total," +
>                 "        SUM(arppu) AS arppu_total," +
>                 "        SUM(spend) AS spend_total," +
>                 "        SUM(cpm_price) AS cpm_price_total" +
>                 "    FROM" +
>                 "        PCOLLECTION" +
>                 "    GROUP BY exchange_name,strategy,platform,segment,auction_type" +
>                 ",placement_type,country,loss,no_bid_reason,app_bundle" +
>                 ",model_id,identifier_type,promotion_id,sub_floor_bid_min_price_cohort" +
>                 ",bf_match_experiment,bep_matched_floor")
> );
> 
> Can you please guide us?
> 
> Let me know in case you need any more information.
> 
> Goutham Miryala
> Senior Data Engineer
> 
>  <http://chartboost.com/>