You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/10/09 19:06:00 UTC

[jira] [Work logged] (BEAM-11050) AggregatorCombiner reuses mutable accumT across multiple merges leading to incorrect results

     [ https://issues.apache.org/jira/browse/BEAM-11050?focusedWorklogId=498701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498701 ]

ASF GitHub Bot logged work on BEAM-11050:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 09/Oct/20 19:05
            Start Date: 09/Oct/20 19:05
    Worklog Time Spent: 10m 
      Work Description: lukecwik opened a new pull request #13061:
URL: https://github.com/apache/beam/pull/13061


   Accumulators can be mutated during merging by the combine fn so we must ensure that we use a unique instance of the accumulator per window.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 498701)
    Remaining Estimate: 0h
            Time Spent: 10m

> AggregatorCombiner reuses mutable accumT across multiple merges leading to incorrect results
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11050
>                 URL: https://issues.apache.org/jira/browse/BEAM-11050
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Example failure:
>  [https://scans.gradle.com/s/lsf5y44b36pyc/tests/:runners:spark:validatesStructuredStreamingRunnerBatch/org.apache.beam.sdk.transforms.CombineTest$WindowingTests/testSlidingWindowsCombine#1]
> The test passes occassionaly and it depends on the order of merge/reduce steps that Spark does. A good run:
> {noformat}
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[c], timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[b], timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[a], timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, b}, timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, a}, timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, c}, timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> {noformat}
> Bad run:
> {noformat}
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[c], timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[b], timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, c}, timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, b}, timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInMultipleWindows{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, a}, timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] result: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)