You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/03/15 22:07:00 UTC

[jira] [Updated] (BEAM-10382) Python SDK should set merge status to "ALREADY_MERGED"

     [ https://issues.apache.org/jira/browse/BEAM-10382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-10382:
-----------------------------------
    Summary: Python SDK should set merge status to "ALREADY_MERGED"  (was: The Python SDK is not handling the sessions properly.)

> Python SDK should set merge status to "ALREADY_MERGED"
> ------------------------------------------------------
>
>                 Key: BEAM-10382
>                 URL: https://issues.apache.org/jira/browse/BEAM-10382
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-core
>    Affects Versions: 2.20.0, 2.22.0
>         Environment: Direct Runner and Dataflow
>            Reporter: Oscar Rodriguez
>            Priority: P2
>              Labels: session, windowing
>
> I tried this with Apache Beam 2.20 and 2.22
> 1) What I want to achieve:
> I have a pipeline that is reading from Google Pub/Sub. The messages have user and product information. In the end, I need to analyse the data so I can know, for each user's session, how many products of each type there are.
> 2) What I did:
> The first thing I do in my pipeline is a "Group by Key", using the user as a key and using "beam.WindowInto(beam.window.Sessions(15))" as windows. Then, as I need to aggregate over products for each user/session, I do another "Group by Key", this time with the product as key.
> 3) What I expect to happen:
> With the first "Group by key", the pipeline creates a different window for each user/session combination. So, for the second "Group by key", I expect that it doesn't
> mix elements that come from different windows.
> 4) What actually happens:
> If the messages are at least 1 second apart from each other, the pipeline works as I expect.
> However, if I publish all the messages at the same time, all the sessions and users get mixed.
> Here https://github.com/Oscar-Rod/apache-beam-testing you have a complete working example.
> To publish the messages I do the following:
>  
> {code:java}
> def generate_message(user, products):
>  return json.dumps({"user": user, "products": products,}).encode("utf-8")
> messages = [
>  generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
>  generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
>  generate_message("user_1", [{"id": "prod_1", "quantity": 1,}]),
>  generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
>  generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
>  generate_message("user_2", [{"id": "prod_1", "quantity": 2,}]),
>  generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
>  generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
>  generate_message("user_3", [{"id": "prod_1", "quantity": 3,}]),
> ]
> for message in messages:
>  publisher.publish(topic_path, data=message)
>  # time.sleep(1)
> {code}
>  
> This will publish 9 messages. As the sessions are configured with a length of 15 seconds, it should create one session for each user. In the end, the user 1 should have 3 "prod_1", the user 2 should have 6 "prod_1" and the user 3 should have 9 "prod_1".
> The first step in the pipeline is reading from Pub/Sub:
>  
> {code:java}
> messages = (
>  pipeline
>  | "read messages" >> beam.io.ReadFromPubSub(topic=options.input_topic)
>  | "parse to messages" >> beam.ParDo(ParseMessage())
>  ){code}
>  
> It will parse the messages to the following Class:
>  
> {code:java}
> class Product(BaseModel):
>  id: str
>  quantity: str
> class Message(BaseModel):
>  user: str
>  products: List[Product]
>  timestamp: datetime
> {code}
>  
> Then, I apply the sessions and the Group By Key:
>  
> {code:java}
> sessions = (
>  messages
>  | "window" >> beam.WindowInto(beam.window.Sessions(15))
>  | "add key" >> beam.Map(lambda element: (element.user, element.products))
>  | "group by user" >> beam.GroupByKey()
>  ){code}
> After this, I am getting the following elements:
>  
> {code:java}
> ('user_1', [[Product(id='prod_1', quantity='1')], [Product(id='prod_1', quantity='1')], [Product(id='prod_1', quantity='1')]])
> ('user_2', [[Product(id='prod_1', quantity='2')], [Product(id='prod_1', quantity='2')], [Product(id='prod_1', quantity='2')]])
> ('user_3', [[Product(id='prod_1', quantity='3')], [Product(id='prod_1', quantity='3')], [Product(id='prod_1', quantity='3')]])
> {code}
>  
> To aggregate for each product, I need the product as a key, so I modified the previous step to flatten the elements:
>  
> {code:java}
> def flat_function(key, elements):
>  for element in elements:
>  yield (key, element)
> sessions = (
>  messages
>  | "window" >> beam.WindowInto(beam.window.Sessions(15))
>  | "add key" >> beam.Map(lambda element: (element.user, element.products))
>  | "group by user" >> beam.GroupByKey()
>  | "first flatten" >> beam.FlatMapTuple(flat_function)
>  | "second flatten" >> beam.FlatMapTuple(flat_function)
>  )
> {code}
>  
> And I am getting the following:
>  
> {code:java}
> ('user_1', Product(id='prod_1', quantity='1'))
> ('user_1', Product(id='prod_1', quantity='1'))
> ('user_1', Product(id='prod_1', quantity='1'))
> ('user_2', Product(id='prod_1', quantity='2'))
> ('user_2', Product(id='prod_1', quantity='2'))
> ('user_2', Product(id='prod_1', quantity='2'))
> ('user_3', Product(id='prod_1', quantity='3'))
> ('user_3', Product(id='prod_1', quantity='3'))
> ('user_3', Product(id='prod_1', quantity='3'))
> {code}
>  
> Now, the last step:
>  
> {code:java}
> products = (
>  sessions
>  | "add new key" >> beam.Map(lambda session: (session[1].id, (session[1], session[0])))
>  | "group by product" >> beam.GroupByKey()
>  ){code}
> And here is where the issue happens. If the messages are published at least 1 second apart, this is what I get:
>  
> {code:java}
> ('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1')])
> ('prod_1', [(Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2')])
> ('prod_1', [(Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3')])
> {code}
>  
> The result is what I expect, 3 elements, one per each user's session. And looking at the "quantity" we can confirm that the result is correct. All elements with "quantity=3" are in the same element, as they come from the same user/session. The same applies to the elements with "quantity=2" and "quantity=1".
> However, if I publish the messages all at the same time, this is what I get:
>  
> {code:java}
> ('prod_1', [(Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='1'), 'user_1'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='2'), 'user_2'), (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3'), (Product(id='prod_1', quantity='3'), 'user_3')]){code}
> Only 1 element, with all the messages in it. So clearly, when the timestamp of the messages is too close, Apache Beam can't put them in different sessions.
> The fact the the behaviour of the pipeline changes when the timestamp of the messages changes, makes me think that this is a bug in Apache Beam. What do you think? Is it possible? Does anyone have an explanation as to why this happens? Can this somehow be expected behaviour?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)