You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2022/03/13 16:59:00 UTC
[jira] [Updated] (BEAM-12554) WriteToFiles destination no changing condition
[ https://issues.apache.org/jira/browse/BEAM-12554?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Beam JIRA Bot updated BEAM-12554:
---------------------------------
Labels: stale-assigned (was: )
> WriteToFiles destination no changing condition
> ----------------------------------------------
>
> Key: BEAM-12554
> URL: https://issues.apache.org/jira/browse/BEAM-12554
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Affects Versions: 2.27.0, 2.28.0, 2.29.0, 2.30.0, 2.35.0
> Reporter: Inigo San Jose Visiers
> Assignee: Pablo Estrada
> Priority: P1
> Labels: stale-assigned
>
> `WriteToFiles` seems to not be working as it should. I have been running some tests and my conclusion is that once the condition for the destination is met once, it is not checked again until a new condition (not seen prev) is met. This results in wrong distribution of elements across files.
> Example code 1:
> {code:python}
> (p | "Create" >> beam.Create(range(100))
> | beam.Map(lambda x: str(x))
> | fileio.WriteToFiles(
> path="./dynamic/",
> destination=lambda n: "in" if n in ["17"] else "out",
> sink=fileio.TextSink(),
> file_naming=fileio.destination_prefix_naming("test"))
> )
> {code}
> Here, the expected result should be a file called "in-xyz" containing only "17" and another called "out-xyz" containing the rest. What we see is that "out" contains numbers 0 to 16 and once 17 condition is met, the rest of numbers would go to "in". So we would have "out" from 0 to 16, "in" from 17 on, which is wrong.
> Changing the number shows it too.
> ____________________
> Example code 2:
> {code:python}
> def odd_even(x):
> value = "even" if int(x) % 2 == 0 else "odd"
> print(value, x)
> return value
> (p | "Create" >> beam.Create(range(100))
> | beam.Map(lambda x: str(x))
> | fileio.WriteToFiles(
> path="./dynamic/",
> destination=odd_even,
> sink=fileio.TextSink(),
> file_naming=fileio.destination_prefix_naming("test"))
> )
> {code}
> We can see that the `odd_even` fn is return the right value, but destination is still wrong. We get "even" only with 0 and "odd" with the rest of numbers, since the condition changed with element "1"
> ____________________
> Example code 3:
> Trying more conditionals or different `file_naming` doesn't fix this
> {code:python}
> def test_15(n):
> three = "three" if int(n) % 3 == 0 else ""
> five = "five" if int(n) % 5 == 0 else ""
> return f"value-{three}{five}"
> def time_format():
> def _inner(window, pane, shard_index, total_shards, compression, destination):
> print(window, pane, shard_index, total_shards, compression, destination)
> return f"dest-{destination}-shards-{shard_index}-of-{total_shards}"
> return _inner
> (p | "Create" >> beam.Create(range(N))
> | beam.Map(lambda x: str(x))
> | fileio.WriteToFiles(
> path="./dynamic/",
> destination=test_15,
> sink=fileio.TextSink(),
> file_naming=time_format())
> )
> {code}
> adding shards or other variables don't help either.
> I have tested this in different SDKs (27, 29, 30) and Dataflow, DirectRunner, InteractiveRunner
--
This message was sent by Atlassian Jira
(v8.20.1#820001)