You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/10/09 19:06:00 UTC
[jira] [Work logged] (BEAM-11050) AggregatorCombiner reuses mutable
accumT across multiple merges leading to incorrect results
[ https://issues.apache.org/jira/browse/BEAM-11050?focusedWorklogId=498701&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-498701 ]
ASF GitHub Bot logged work on BEAM-11050:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 09/Oct/20 19:05
Start Date: 09/Oct/20 19:05
Worklog Time Spent: 10m
Work Description: lukecwik opened a new pull request #13061:
URL: https://github.com/apache/beam/pull/13061
Accumulators can be mutated during merging by the combine fn so we must ensure that we use a unique instance of the accumulator per window.
------------------------
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
- [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
--- | --- | --- | --- | --- | --- | ---
Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website | Whitespace | Typescript
--- | --- | --- | --- | --- | --- | ---
Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Issue Time Tracking
-------------------
Worklog Id: (was: 498701)
Remaining Estimate: 0h
Time Spent: 10m
> AggregatorCombiner reuses mutable accumT across multiple merges leading to incorrect results
> --------------------------------------------------------------------------------------------
>
> Key: BEAM-11050
> URL: https://issues.apache.org/jira/browse/BEAM-11050
> Project: Beam
> Issue Type: Bug
> Components: runner-spark
> Reporter: Luke Cwik
> Assignee: Luke Cwik
> Priority: P2
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Example failure:
> [https://scans.gradle.com/s/lsf5y44b36pyc/tests/:runners:spark:validatesStructuredStreamingRunnerBatch/org.apache.beam.sdk.transforms.CombineTest$WindowingTests/testSlidingWindowsCombine#1]
> The test passes occassionaly and it depends on the order of merge/reduce steps that Spark does. A good run:
> {noformat}
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[c], timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[b], timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[a], timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, b}, timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, a}, timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, c}, timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> {noformat}
> Bad run:
> {noformat}
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[c], timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInMultipleWindows{value=[b], timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, c}, timestamp=1970-01-01T00:00:00.003Z, windows=[[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), [1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, b}, timestamp=1970-01-01T00:00:00.002Z, windows=[[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), [1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [] result: [TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInMultipleWindows{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}}] rval: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK reduce value: TimestampedValueInMultipleWindows{value=KV{null, a}, timestamp=1970-01-01T00:00:00.001Z, windows=[[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), [1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), [1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z)], pane=PaneInfo{isFirst=true, timing=EARLY, index=0}} accum: [TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}] result: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [] accum2: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> LCWIK merge accum1: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] accum2: [TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}] rval: [TimestampedValueInSingleWindow{value=[a, c], timestamp=1970-01-01T00:00:00.001Z, window=[1969-12-31T23:59:59.999Z..1970-01-01T00:00:00.002Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[c], timestamp=1970-01-01T00:00:00.005Z, window=[1970-01-01T00:00:00.003Z..1970-01-01T00:00:00.006Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.003Z, window=[1970-01-01T00:00:00.001Z..1970-01-01T00:00:00.004Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, c], timestamp=1970-01-01T00:00:00.004Z, window=[1970-01-01T00:00:00.002Z..1970-01-01T00:00:00.005Z), pane=PaneInfo.NO_FIRING}, TimestampedValueInSingleWindow{value=[b, a, c], timestamp=1970-01-01T00:00:00.002Z, window=[1970-01-01T00:00:00.000Z..1970-01-01T00:00:00.003Z), pane=PaneInfo.NO_FIRING}]
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)