You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/03/15 22:30:00 UTC

[jira] [Work logged] (BEAM-14064) ElasticSearchIO#Write buffering and outputting across windows

     [ https://issues.apache.org/jira/browse/BEAM-14064?focusedWorklogId=741924&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-741924 ]

ASF GitHub Bot logged work on BEAM-14064:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 15/Mar/22 22:29
            Start Date: 15/Mar/22 22:29
    Worklog Time Spent: 10m 
      Work Description: egalpin opened a new pull request #17097:
URL: https://github.com/apache/beam/pull/17097


   Removes bundle-based batching for ElasticsearchIO in favour of solely supporting state-based batching (via GroupIntoBatches).  This will ensure that batching has a centralized transform for re-use, as well as ensuring that windows stay intact throughout ElasticsearchIO#write.
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 741924)
    Remaining Estimate: 0h
            Time Spent: 10m

> ElasticSearchIO#Write buffering and outputting across windows
> -------------------------------------------------------------
>
>                 Key: BEAM-14064
>                 URL: https://issues.apache.org/jira/browse/BEAM-14064
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-elasticsearch
>    Affects Versions: 2.35.0, 2.36.0, 2.37.0
>            Reporter: Luke Cwik
>            Assignee: Evan Galpin
>            Priority: P2
>             Fix For: 2.38.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Source: https://lists.apache.org/thread/mtwtno2o88lx3zl12jlz7o5w1lcgm2db
> Bug PR: https://github.com/apache/beam/pull/15381
> ElasticsearchIO is collecting results from elements in window X and then trying to output them in window Y when flushing the batch. This exposed a bug where elements that were being buffered were being output as part of a different window than what the window that produced them was.
> This became visible because validation was added recently to ensure that when the pipeline is processing elements in window X that output with a timestamp is valid for window X. Note that this validation only occurs in *@ProcessElement* since output is associated with the current window with the input element that is being processed.
> It is ok to do this in *@FinishBundle* since there is no existing windowing context and when you output that element is assigned to an appropriate window.
> *Further Context*
> We’ve bisected it to being introduced in 2.35.0, and I’m reasonably certain it’s this PR https://github.com/apache/beam/pull/15381
> Our scenario is pretty trivial, we read off Pubsub and write to Elastic in a streaming job, the config for the source and sink is respectively
> {noformat}
> pipeline.apply(
>             PubsubIO.readStrings().fromSubscription(subscription)
>         ).apply(ParseJsons.of(OurObject::class.java))
>             .setCoder(KryoCoder.of())
> {noformat}
> and
> {noformat}
> ElasticsearchIO.write()
>             .withUseStatefulBatches(true)
>             .withMaxParallelRequestsPerWindow(1)
>             .withMaxBufferingDuration(Duration.standardSeconds(30))
>             // 5 bytes **> KiB **> MiB, so 5 MiB
>             .withMaxBatchSizeBytes(5L * 1024 * 1024)
>             // # of docs
>             .withMaxBatchSize(1000)
>             .withConnectionConfiguration(
>                 ElasticsearchIO.ConnectionConfiguration.create(
>                     arrayOf(host),
>                     "fubar",
>                     "_doc"
>                 ).withConnectTimeout(5000)
>                     .withSocketTimeout(30000)
>             )
>             .withRetryConfiguration(
>                 ElasticsearchIO.RetryConfiguration.create(
>                     10,
>                     // the duration is wall clock, against the connection and socket timeouts specified
>                     // above. I.e., 10 x 30s is gonna be more than 3 minutes, so if we're getting
>                     // 10 socket timeouts in a row, this would ignore the "10" part and terminate
>                     // after 6. The idea is that in a mixed failure mode, you'd get different timeouts
>                     // of different durations, and on average 10 x fails < 4m.
>                     // That said, 4m is arbitrary, so adjust as and when needed.
>                     Duration.standardMinutes(4)
>                 )
>             )
>             .withIdFn { f: JsonNode -> f["id"].asText() }
>             .withIndexFn { f: JsonNode -> f["schema_name"].asText() }
>             .withIsDeleteFn { f: JsonNode -> f["_action"].asText("noop") == "delete" }
> {noformat}
> We recently tried upgrading 2.33 to 2.36 and immediately hit a bug in the consumer, due to alleged time skew, specifically
> {noformat}
> 2022-03-07 10:48:37.886 GMTError message from worker: java.lang.IllegalArgumentException: Cannot output with timestamp 2022-03-07T10:43:38.640Z. Output timestamps must be no earlier than the timestamp of the 
> current input (2022-03-07T10:43:43.562Z) minus the allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc 
> for details on changing the allowed skew. 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.checkTimestamp(SimpleDoFnRunner.java:446) 
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:422) 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn$ProcessContextAdapter.output(ElasticsearchIO.java:2364) 
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.flushAndOutputResults(ElasticsearchIO.java:2404)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOBaseFn.addAndMaybeFlush(ElasticsearchIO.java:2419)
> org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$BulkIO$BulkIOStatefulFn.processElement(ElasticsearchIO.java:2300)
> {noformat}
> I’ve bisected it and 2.34 works fine, 2.35 is the first version this breaks, and it seems like the code in the trace is largely added by the PR linked above. The error usually claims a skew of a few seconds, but obviously I can’t override getAllowedTimestampSkew() on the internal Elastic DoFn, and it’s marked deprecated anyway.
> I’m happy to raise a JIRA but I’m not 100% sure what the code was intending to fix, and additionally, I’d also be happy if someone else can reproduce this or knows of similar reports. I feel like what we’re doing is not that uncommon a scenario, so I would have thought someone else would have hit this by now.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)