You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Aviem Zur (JIRA)" <ji...@apache.org> on 2017/08/24 08:39:00 UTC

[jira] [Commented] (BEAM-2789) Watermark can become unavailable for executors while it's updated with new values

    [ https://issues.apache.org/jira/browse/BEAM-2789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16139764#comment-16139764 ] 

Aviem Zur commented on BEAM-2789:
---------------------------------

While this affects the tests greatly since these are in a single JVM, I wonder what the the affect in actual applications running in the cluster?
How likely is it that by the time the executors poll this block these operations have not completed? Also, if it does happen, what effect does this have, and is it not alleviated after one microbatch?

> Watermark can become unavailable for executors while it's updated with new values
> ---------------------------------------------------------------------------------
>
>                 Key: BEAM-2789
>                 URL: https://issues.apache.org/jira/browse/BEAM-2789
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>    Affects Versions: 2.1.0
>            Reporter: Stas Levin
>            Assignee: Amit Sela
>              Labels: watermark
>
> The watermark is updated by the driver like so:
> {code:java}
> blockManager.removeBlock(WATERMARKS_BLOCK_ID, true);
> blockManager.putSingle(WATERMARKS_BLOCK_ID, newValues, StorageLevel.MEMORY_ONLY(), 
> true);
> {code}
> However, these operations are neither synchronous nor atomic, so if an executor requests the watermark values before they are actually put but after they have been removed, it may get a {{null}} as a response, which will make it default to negative infinity as the watermark. This can result in an erroneous results.
> To overcome this in tests, a workaround which assumes a single JVM setting is used. In such a setting the watermark values are stored in a static member, accessible by both the driver and the executors, bypassing the {{BlockManager#putSingle(...)}} {{BlockManager#removeBlock(...)}} APIs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)