You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Milan Nikl (Jira)" <ji...@apache.org> on 2020/09/22 08:27:00 UTC
[jira] [Updated] (BEAM-10945) ElasticsearchIO performs 0 division
on DirectRunner
[ https://issues.apache.org/jira/browse/BEAM-10945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Milan Nikl updated BEAM-10945:
------------------------------
Description:
h1. Environment configuration
In my company we use [Elasticsearch cross cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis] setup for search. Cluster version is 7.9.1.
I intended to use ElasticsearchIO for reading application logs and subsequently producing some aggregated data.
h1. Problem description
# In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so it is not possible to compute [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692] properly.
# {{statsJson}} returned by the cluster looks like this:
{quote}{
"_shards" :
Unknown macro: \{ "total" }
,
"_all" : {
"primaries" : \{ },
"total" : \{ }
},
"indices" : \{ }
}
{quote}
# That means that {{totalCount}} value cannot be parsed from the json and is thus set to {{0}}.
# Which means that {{estimatedByteSize}} value [is set to 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707] (Which itself is a workaround for similar issue.)
# {{ElasticsearchIO#getEstimatedSizeBytes}} is used in [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212] which does not check the value and performs division of two {{long}} values, which of course results in {{0}} for any {{targetParallelism > 1}}.
# Then [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665] is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which sets {{nbBundlesFloat}} value to infinity.
# Even though the number of bundles is ceiled at {{1024}}, reading from 1024 {{BoundedElasticsearchSources concurrently makes the ElasticsearchIO}} virtually impossible to use on direct runner.
h1. Resolution suggestion
I still haven't tested reading from ElasticsearchIO on proper runner (we use flink 1.10.2), so I cannot either confirm or deny its functionality on our elastic setup. At the moment I|m just suggesting few checks of input values so the zero division and unnecessary parallelism problems are eliminated on direct runner.
was:
h1. Environment configuration
In my company we use [Elasticsearch cross cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis] setup for search. Cluster version is 7.9.1.
I intended to use ElasticsearchIO for reading application logs and subsequently producing some aggregated data.
h1. Problem description
# In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so it is not possible to compute [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692] properly.
# {{statsJson}} returned by the cluster looks like this:
{quote}
{
"_shards" : {
"total" : 0,
"successful" : 0,
"failed" : 0
},
"_all" : {
"primaries" : { },
"total" : { }
},
"indices" : { }
}
{quote}
# That means that {{totalCount}} value cannot be parsed from the json and is thus set to {{0}}.
# Which means that {{estimatedByteSize}} value [is set to 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707] (Which itself is a workaround for similar issue.)
# {{ElasticsearchIO#getEstimatedSizeBytes}} is used in [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212] which does not check the value and performs division of two {{long}} values, which of course results in {{0}} for any {{targetParallelism > 1}}.
# Then [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665] is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which sets {{nbBundlesFloat}} value to infinity.
# Even though the number of bundles is ceiled at {{1024}}, reading from 1024 {{BoundedElasticsearchSource}}s concurrently makes the {{ElasticsearchIO}} virtually impossible to use on direct runner.
h1. Resolution suggestion
I still haven't tested reading from ElasticsearchIO on proper runner (we use flink 1.10.2), so I cannot either confirm or deny its functionality on our elastic setup. At the moment I|m just suggesting few checks of input values so the zero division and unnecessary parallelism problems are eliminated on direct runner.
> ElasticsearchIO performs 0 division on DirectRunner
> ---------------------------------------------------
>
> Key: BEAM-10945
> URL: https://issues.apache.org/jira/browse/BEAM-10945
> Project: Beam
> Issue Type: Bug
> Components: io-java-elasticsearch, runner-direct
> Affects Versions: 2.23.0
> Environment: * Beam 2.23
> * Java 1.8.0_265
> * Ubuntu 16.04
> * Elastic version of cluster 7.9.1, cross cluster setup
> * Parallelism of direct runner 8
> Reporter: Milan Nikl
> Priority: P2
>
> h1. Environment configuration
> In my company we use [Elasticsearch cross cluster|https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html#ccs-supported-apis] setup for search. Cluster version is 7.9.1.
> I intended to use ElasticsearchIO for reading application logs and subsequently producing some aggregated data.
> h1. Problem description
> # In cross cluster ES setup, there is no {{/<index>/_stats}} API available, so it is not possible to compute [ElasticsearchIO#getEstimatedSizeBytes|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L692] properly.
> # {{statsJson}} returned by the cluster looks like this:
> {quote}{
> "_shards" :
> Unknown macro: \{ "total" }
> ,
> "_all" : {
> "primaries" : \{ },
> "total" : \{ }
> },
> "indices" : \{ }
> }
> {quote}
> # That means that {{totalCount}} value cannot be parsed from the json and is thus set to {{0}}.
> # Which means that {{estimatedByteSize}} value [is set to 1|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L707] (Which itself is a workaround for similar issue.)
> # {{ElasticsearchIO#getEstimatedSizeBytes}} is used in [BoundedReadEvaluatorFactory#getInitialInputs|https://github.com/apache/beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java#L212] which does not check the value and performs division of two {{long}} values, which of course results in {{0}} for any {{targetParallelism > 1}}.
> # Then [ElasticsearchIO#split|https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L665] is called with {{indexSize = 1}} and {{desiredBundleSizeBytes = 1}}. Which sets {{nbBundlesFloat}} value to infinity.
> # Even though the number of bundles is ceiled at {{1024}}, reading from 1024 {{BoundedElasticsearchSources concurrently makes the ElasticsearchIO}} virtually impossible to use on direct runner.
> h1. Resolution suggestion
> I still haven't tested reading from ElasticsearchIO on proper runner (we use flink 1.10.2), so I cannot either confirm or deny its functionality on our elastic setup. At the moment I|m just suggesting few checks of input values so the zero division and unnecessary parallelism problems are eliminated on direct runner.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)