You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Nick Pan <as...@gmail.com> on 2022/03/09 08:52:41 UTC

Running Query Containing Aggregation Using ElasticsearchIO Read

Hello,

I have a use case where I need to first compute an aggregation for each
key, and then filter out the keys based on some criteria.   And finally
feed the matched keys as an input to PCollection using ElasticsearchIO
read.  But ElasticsearchIO does not seem to support query that contains
aggregation:

Error message from worker: org.elasticsearch.client.ResponseException:
method [GET], host [https://...], URI [...], status line [HTTP/1.1 400 Bad
Request]
{"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
not support
[aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
does not support [aggregations]","line":1,"col":135},"status":400}
org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)


Here is an example of the Elasticsearch query I am trying to do:

{

 "aggs": {

    "user_id": {

      "composite": {

        "sources": [

          { "user_id": { "terms": { "field": "user_id" } } }

        ]

      },

      "aggs": {

        "min": {

          "min": {

            "field": "play_time"

          }

        },

        "max": {

          "max": {

            "field": "play_time"

          }

        },

        "diff": {

          "bucket_selector": {

            "buckets_path": {

              "min": "min",

              "max": "max"

            },

            "script": "params.max - params.min > 5000"

          }

        }

      }

    }

  }

}



Is Elasticsearch query that contains aggregation not supported in
ElasticsearchIO?  If not, is there a way to work around this?


Thanks,

Nick

Re: Running Query Containing Aggregation Using ElasticsearchIO Read

Posted by Evan Galpin <ev...@gmail.com>.
Great idea Nick, it could definitely work to transform the data within
Elasticsearch first and then effectively run a "match_all" against the
transformed index within Beam.  Agreed that it's not ideal from a
user-experience perspective but hopefully serves as another potential path
to unblocking your development.

I'll also put some thought into how your requested functionality might be
able to be supported within ElasticsearchIO.  Feel free to open a ticket in
Jira[1] to track the idea :-)

[1] https://issues.apache.org/jira/projects/BEAM/summary

On Wed, Mar 9, 2022 at 3:18 PM Nick Pan <as...@gmail.com> wrote:

> Hi Evan,
>
> Thanks for the info.  Since I need to support many aggregation types,
> implementing all the group-by logic in PTransform can be a lot of work.
> But I will look into the 2nd approach.
> Another approach I am considering is to directly kick off a single batch
> transform to group/aggregate/filter the data in Elasticsearch, so the
> workers will only need to search the result without needing to run
> aggregation.  But this can get ugly too because the workers need to wait
> for the Elasticsearch transform to finish.
>
> Nick
>
> On Wed, Mar 9, 2022 at 6:53 AM Evan Galpin <ev...@gmail.com> wrote:
>
>> You're correct in your assessment that ElasticsearchIO does not currently
>> support queries with aggregations.  There's a large difference between
>> scrolling over large sets of documents (which has a common interface
>> provided by ES) Vs aggregations where user-code in the query will impact
>> the output fields.  Another major difference is that queries with aggs
>> effectively have a singular output – the result of the aggs – rather than a
>> potentially huge number of documents as "hits".  However, aggs could
>> produce thousands of buckets where each bucket is desired to be an element
>> in a PCollection, so it could be useful to support this use-case in the
>> future.
>>
>> In order to work around this, there are 2 options that jump to mind:
>>
>>    1. Transfer the filtering logic to Beam:  Read all the documents that
>>    you might be interested in using ElasticsearchIO with just a search (no
>>    aggs) and implement the grouping per user + filtering within Beam using
>>    PTransforms
>>    2. Write your own Source (SplittableDoFn?) which makes your above
>>    query (with aggs) using the Elasticsearch RestClient, parses the response,
>>    and outputs the parsed elements one-by-one to a PCollection for further
>>    processing. This would effectively be the approach to supporting your
>>    desired functionality within ElasticsearchIO as well.
>>
>> - Evan
>>
>> On Wed, Mar 9, 2022 at 3:52 AM Nick Pan <as...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I have a use case where I need to first compute an aggregation for each
>>> key, and then filter out the keys based on some criteria.   And finally
>>> feed the matched keys as an input to PCollection using ElasticsearchIO
>>> read.  But ElasticsearchIO does not seem to support query that contains
>>> aggregation:
>>>
>>> Error message from worker: org.elasticsearch.client.ResponseException:
>>> method [GET], host [https://...], URI [...], status line [HTTP/1.1 400
>>> Bad Request]
>>> {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
>>> not support
>>> [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
>>> does not support [aggregations]","line":1,"col":135},"status":400}
>>> org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
>>> org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
>>> org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
>>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
>>>
>>>
>>> Here is an example of the Elasticsearch query I am trying to do:
>>>
>>> {
>>>
>>>  "aggs": {
>>>
>>>     "user_id": {
>>>
>>>       "composite": {
>>>
>>>         "sources": [
>>>
>>>           { "user_id": { "terms": { "field": "user_id" } } }
>>>
>>>         ]
>>>
>>>       },
>>>
>>>       "aggs": {
>>>
>>>         "min": {
>>>
>>>           "min": {
>>>
>>>             "field": "play_time"
>>>
>>>           }
>>>
>>>         },
>>>
>>>         "max": {
>>>
>>>           "max": {
>>>
>>>             "field": "play_time"
>>>
>>>           }
>>>
>>>         },
>>>
>>>         "diff": {
>>>
>>>           "bucket_selector": {
>>>
>>>             "buckets_path": {
>>>
>>>               "min": "min",
>>>
>>>               "max": "max"
>>>
>>>             },
>>>
>>>             "script": "params.max - params.min > 5000"
>>>
>>>           }
>>>
>>>         }
>>>
>>>       }
>>>
>>>     }
>>>
>>>   }
>>>
>>> }
>>>
>>>
>>>
>>> Is Elasticsearch query that contains aggregation not supported in
>>> ElasticsearchIO?  If not, is there a way to work around this?
>>>
>>>
>>> Thanks,
>>>
>>> Nick
>>>
>>

Re: Running Query Containing Aggregation Using ElasticsearchIO Read

Posted by Nick Pan <as...@gmail.com>.
Hi Evan,

Thanks for the info.  Since I need to support many aggregation types,
implementing all the group-by logic in PTransform can be a lot of work.
But I will look into the 2nd approach.
Another approach I am considering is to directly kick off a single batch
transform to group/aggregate/filter the data in Elasticsearch, so the
workers will only need to search the result without needing to run
aggregation.  But this can get ugly too because the workers need to wait
for the Elasticsearch transform to finish.

Nick

On Wed, Mar 9, 2022 at 6:53 AM Evan Galpin <ev...@gmail.com> wrote:

> You're correct in your assessment that ElasticsearchIO does not currently
> support queries with aggregations.  There's a large difference between
> scrolling over large sets of documents (which has a common interface
> provided by ES) Vs aggregations where user-code in the query will impact
> the output fields.  Another major difference is that queries with aggs
> effectively have a singular output – the result of the aggs – rather than a
> potentially huge number of documents as "hits".  However, aggs could
> produce thousands of buckets where each bucket is desired to be an element
> in a PCollection, so it could be useful to support this use-case in the
> future.
>
> In order to work around this, there are 2 options that jump to mind:
>
>    1. Transfer the filtering logic to Beam:  Read all the documents that
>    you might be interested in using ElasticsearchIO with just a search (no
>    aggs) and implement the grouping per user + filtering within Beam using
>    PTransforms
>    2. Write your own Source (SplittableDoFn?) which makes your above
>    query (with aggs) using the Elasticsearch RestClient, parses the response,
>    and outputs the parsed elements one-by-one to a PCollection for further
>    processing. This would effectively be the approach to supporting your
>    desired functionality within ElasticsearchIO as well.
>
> - Evan
>
> On Wed, Mar 9, 2022 at 3:52 AM Nick Pan <as...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a use case where I need to first compute an aggregation for each
>> key, and then filter out the keys based on some criteria.   And finally
>> feed the matched keys as an input to PCollection using ElasticsearchIO
>> read.  But ElasticsearchIO does not seem to support query that contains
>> aggregation:
>>
>> Error message from worker: org.elasticsearch.client.ResponseException:
>> method [GET], host [https://...], URI [...], status line [HTTP/1.1 400
>> Bad Request]
>> {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
>> not support
>> [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
>> does not support [aggregations]","line":1,"col":135},"status":400}
>> org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
>> org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
>> org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
>> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
>>
>>
>> Here is an example of the Elasticsearch query I am trying to do:
>>
>> {
>>
>>  "aggs": {
>>
>>     "user_id": {
>>
>>       "composite": {
>>
>>         "sources": [
>>
>>           { "user_id": { "terms": { "field": "user_id" } } }
>>
>>         ]
>>
>>       },
>>
>>       "aggs": {
>>
>>         "min": {
>>
>>           "min": {
>>
>>             "field": "play_time"
>>
>>           }
>>
>>         },
>>
>>         "max": {
>>
>>           "max": {
>>
>>             "field": "play_time"
>>
>>           }
>>
>>         },
>>
>>         "diff": {
>>
>>           "bucket_selector": {
>>
>>             "buckets_path": {
>>
>>               "min": "min",
>>
>>               "max": "max"
>>
>>             },
>>
>>             "script": "params.max - params.min > 5000"
>>
>>           }
>>
>>         }
>>
>>       }
>>
>>     }
>>
>>   }
>>
>> }
>>
>>
>>
>> Is Elasticsearch query that contains aggregation not supported in
>> ElasticsearchIO?  If not, is there a way to work around this?
>>
>>
>> Thanks,
>>
>> Nick
>>
>

Re: Running Query Containing Aggregation Using ElasticsearchIO Read

Posted by Evan Galpin <ev...@gmail.com>.
You're correct in your assessment that ElasticsearchIO does not currently
support queries with aggregations.  There's a large difference between
scrolling over large sets of documents (which has a common interface
provided by ES) Vs aggregations where user-code in the query will impact
the output fields.  Another major difference is that queries with aggs
effectively have a singular output – the result of the aggs – rather than a
potentially huge number of documents as "hits".  However, aggs could
produce thousands of buckets where each bucket is desired to be an element
in a PCollection, so it could be useful to support this use-case in the
future.

In order to work around this, there are 2 options that jump to mind:

   1. Transfer the filtering logic to Beam:  Read all the documents that
   you might be interested in using ElasticsearchIO with just a search (no
   aggs) and implement the grouping per user + filtering within Beam using
   PTransforms
   2. Write your own Source (SplittableDoFn?) which makes your above query
   (with aggs) using the Elasticsearch RestClient, parses the response, and
   outputs the parsed elements one-by-one to a PCollection for further
   processing. This would effectively be the approach to supporting your
   desired functionality within ElasticsearchIO as well.

- Evan

On Wed, Mar 9, 2022 at 3:52 AM Nick Pan <as...@gmail.com> wrote:

> Hello,
>
> I have a use case where I need to first compute an aggregation for each
> key, and then filter out the keys based on some criteria.   And finally
> feed the matched keys as an input to PCollection using ElasticsearchIO
> read.  But ElasticsearchIO does not seem to support query that contains
> aggregation:
>
> Error message from worker: org.elasticsearch.client.ResponseException:
> method [GET], host [https://...], URI [...], status line [HTTP/1.1 400
> Bad Request]
> {"error":{"root_cause":[{"type":"parsing_exception","reason":"request does
> not support
> [aggregations]","line":1,"col":135}],"type":"parsing_exception","reason":"request
> does not support [aggregations]","line":1,"col":135},"status":400}
> org.elasticsearch.client.RestClient.convertResponse(RestClient.java:331)
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:301)
> org.elasticsearch.client.RestClient.performRequest(RestClient.java:276)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.queryCount(ElasticsearchIO.java:780)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.getEstimatedSizeBytes(ElasticsearchIO.java:762)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BoundedElasticsearchSource.split(ElasticsearchIO.java:710)
>
>
> Here is an example of the Elasticsearch query I am trying to do:
>
> {
>
>  "aggs": {
>
>     "user_id": {
>
>       "composite": {
>
>         "sources": [
>
>           { "user_id": { "terms": { "field": "user_id" } } }
>
>         ]
>
>       },
>
>       "aggs": {
>
>         "min": {
>
>           "min": {
>
>             "field": "play_time"
>
>           }
>
>         },
>
>         "max": {
>
>           "max": {
>
>             "field": "play_time"
>
>           }
>
>         },
>
>         "diff": {
>
>           "bucket_selector": {
>
>             "buckets_path": {
>
>               "min": "min",
>
>               "max": "max"
>
>             },
>
>             "script": "params.max - params.min > 5000"
>
>           }
>
>         }
>
>       }
>
>     }
>
>   }
>
> }
>
>
>
> Is Elasticsearch query that contains aggregation not supported in
> ElasticsearchIO?  If not, is there a way to work around this?
>
>
> Thanks,
>
> Nick
>