You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/08/29 21:16:20 UTC

[GitHub] [beam] lukecwik opened a new issue, #22945: [Bug]: PCollectionView.listView is incompatible with discarding triggers that fire multiple times.

lukecwik opened a new issue, #22945:
URL: https://github.com/apache/beam/issues/22945

   ### What happened?
   
   `PCollectionView.listView` materialization for streaming runners produces this graph segment:
   Users transforms -> [ToListViewDoFn](https://github.com/apache/beam/blob/4ae54b2e1e28096f2b173d5f5574910e8cfd80c9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java#L284) -> [Combine.globally(Concatenate)](https://github.com/apache/beam/blob/243128a8fc52798e1b58b0cf1a271d95ee7aa241/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/StreamingViewOverrides.java#L67)
   
   The issue is that the output from `ToListViewDoFn` is being chopped up into multiple panes for the `Combine.globally`. This would mean that the metadata records described below could exist while some of the previous records that should have been written are missing for key order preserving runners or any arbitrary records could be missing for non key order preserving runners.
   
   The work around is to not use `PCollectionView.listView` with a discarding fired panes windowing strategy that fires multiple times or to migrate away from `PCollectionView.listView` to a singleton if using the slowly changing side input pattern.
   ---
   
   The `ToListViewDoFn` generates a starting `index` in the range `[Long.MIN_VALUE + 1, Long.MAX_VALUE - Integer.MAX_VALUE]` for each unique window and tag values incrementing the index. For example:
   elements A, B, C -> generate random index of 5 -> produces `KV<5, A>, KV<6, B>, KV<7, C>` and a metadata record `KV<Long.MIN_VALUE, [5, 8)>`.
   if there is another bundle:
   elements X, Y, Z -> generate random index of 7 -> produces `KV<7, X>, KV<8, Y>, KV<9, Z>` and a metadata record `KV<Long.MIN_VALUE, [7, 9)>`.
   this would create a multimap like:
   ```
   Long.MIN_VALUE -> {[5,8), [7,10)}
   5 -> {A}
   6 -> {B}
   7 -> {C, X}
   8 -> {Y}
   9 -> {Z}
   ```
   
   This allows us to produce a mapping of range to # elements like:
   ```
   [5, 7) -> 1
   [7, 8) -> 2
   [8, 10) -> 1
   ```
   
   Using this we can compute a global position by summing `(range.end - range.start) * # elements` in the ordered list of ranges until we find a range that would produce a position that is greater than the index. Once we have this range we can compute a sub position within that range. For example, if we wanted to find index 3, we would iterate over the range `[5, 7) -> 1` and know that we have advanced 2 elements `((7-5)*1)`. Then we would notice that `[7,8)` has 2 elements in it so no need to advance further and would find that our position is `7` and subPosition `1` telling is to access key `7` and iterate over the 0-based iterable till we get to position `1` returning `X`.
   
   ### Issue Priority
   
   Priority: 2
   
   ### Issue Component
   
   Component: sdk-java-core


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] lukecwik commented on issue #22945: [Bug]: PCollectionView> is incompatible with discarding triggers that fire multiple times.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on issue #22945:
URL: https://github.com/apache/beam/issues/22945#issuecomment-1234919430

   An idea to fix this would be to swap out the concatenate combiner with one which outputs the records and the metadata record for PCollection list view ensuring that the metadata record only exists for the set of records that are being written. This would work for accumulating and discarding mode and we could ensure that there is only a single metadata record for all the values greatly simplifying down the access code since we wouldn't need to perform the mapping from index to position and sub position since there would only ever be one single metadata record.
   
   This should be pipeline update compatible since the coder that we used would have metadata records and records already encoded in it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org