You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/09/13 17:04:00 UTC

[jira] [Work logged] (BEAM-5690) Issue with GroupByKey in BeamSql using SparkRunner

     [ https://issues.apache.org/jira/browse/BEAM-5690?focusedWorklogId=312180&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-312180 ]

ASF GitHub Bot logged work on BEAM-5690:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Sep/19 17:03
            Start Date: 13/Sep/19 17:03
    Worklog Time Spent: 10m 
      Work Description: bmv126 commented on pull request #9567: [BEAM-5690] Fix Zero value issue with GroupByKey/CountByKey in SparkRunner
URL: https://github.com/apache/beam/pull/9567
 
 
   With CountByKey transform in sparkRunner the expired window is not removed from the timer set.
   This causes the empty list to be present as value. 
   2019-09-12 21:46:57,384 [Executor task launch worker for task 510] TRACE org.apache.beam.runners.spark.stateful.SparkGroupAlsoByWindowViaWindowSet  - SqlTransform/BeamAggregationRel_45/Group.CombineFieldsByFields/Group.ByFields/GroupByKey: output elements are TimestampedValueInSingleWindow{value=KV{Row:[90001], []}, timestamp=2019-09-12T16:15:59.999Z, window=[2019-09-12T16:15:00.000Z..2019-09-12T16:16:00.000Z), pane=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, onTimeIndex=0}}
   
   This causes zero values to be emitted whenever the watermark advances for the key.
   
   Issue can be reproduced using BeamSql (groupByKey) with SparkRunner also.
   
   
   Fix is provided to check if the timer has crossed the WindowBoundary + lateness, if so the timer is
   removed from SparkTimerInternals.
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 312180)
    Remaining Estimate: 0h
            Time Spent: 10m

> Issue with GroupByKey in BeamSql using SparkRunner
> --------------------------------------------------
>
>                 Key: BEAM-5690
>                 URL: https://issues.apache.org/jira/browse/BEAM-5690
>             Project: Beam
>          Issue Type: Task
>          Components: runner-spark
>            Reporter: Kenneth Knowles
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Reported on user@
> {quote}We are trying to setup a pipeline with using BeamSql and the trigger used is default (AfterWatermark crosses the window). 
> Below is the pipeline:
>   
>    KafkaSource (KafkaIO) 
>        ---> Windowing (FixedWindow 1min)
>        ---> BeamSql
>        ---> KafkaSink (KafkaIO)
>                          
> We are using Spark Runner for this. 
> The BeamSql query is:
> {code}select Col3, count(*) as count_col1 from PCOLLECTION GROUP BY Col3{code}
> We are grouping by Col3 which is a string. It can hold values string[0-9]. 
>              
> The records are getting emitted out at 1 min to kafka sink, but the output record in kafka is not as expected.
> Below is the output observed: (WST and WET are indicators for window start time and window end time)
> {code}
> {"count_col1":1,"Col3":"string5","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":3,"Col3":"string7","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":2,"Col3":"string8","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":1,"Col3":"string2","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":1,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0000  +0000"}
> {"count_col1":0,"Col3":"string6","WST":"2018-10-09  09-55-00 0000  +0000","WET":"2018-10-09  09-56-00 0}
> {code}
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)