You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jagadish (JIRA)" <ji...@apache.org> on 2017/04/10 19:12:41 UTC

[jira] [Assigned] (SAMZA-1202) Multiple calls to `graph.getInputStream()` with the same streamId results in non-deterministic behavior

     [ https://issues.apache.org/jira/browse/SAMZA-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jagadish reassigned SAMZA-1202:
-------------------------------

    Assignee: Jagadish

> Multiple calls to `graph.getInputStream()` with the same streamId results in non-deterministic behavior
> -------------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1202
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1202
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jagadish
>            Assignee: Jagadish
>
> Consider the following code-snippet that invokes graph.getInputStream multiple times with the same streamId (but with different Message Builders).
> {code}
>     BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
>     BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v);
>    graph.getInputStream("page-views", msgBuilder2);
>     MessageStream<String> pageViews1 = graph.getInputStream("page-views", msgBuilder1);
>    pageViews1.map(..)
>                       .filter(..)
>                       .window(..)
>                       .sink(..)
> {code}
> TL;DR:
> The above snippet could result in messages not being propagated to the operator chain depending on the iteration order of Java Hashmaps. 
> *Here is the exact sequence:*
> 1. User creates two `MessageStream`s by multiple calls to `graph.getInputStream()` with the same streamId but different MessageBuilders.
> 2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec instance for the streamId and adds it to its map.
> 3. During the wire-up of the physical DAG in `StreamOperatorTask`, streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked.
> 4. Depending on the iteration order in which `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its results, we could end up with a different representation of the DAG (with the latest streamSpec clobbering it's previous one).
> There are 2 approaches to solve this:
> *Approach 1:*
> Add additional validation to prevent this scenario from happening. We will validate multiple calls made to `graph.getInputStream` with the same streamId, and throw an IllegalArgumentException.
> *Approach 2:*
> Maintain a `MultiMap` instead of a `HashMap` so that the latest `StreamSpec` does not clobber the earlier one.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)