You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/04/01 19:17:00 UTC

[jira] [Commented] (BEAM-12034) beam sql union of 2 unbounded streams always performs CGBK

    [ https://issues.apache.org/jira/browse/BEAM-12034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17313390#comment-17313390 ] 

Kenneth Knowles commented on BEAM-12034:
----------------------------------------

Agreed! This is BEAM-3721 and would be a good starter bug if you want to contribute!

> beam sql union of 2 unbounded streams always performs CGBK
> ----------------------------------------------------------
>
>                 Key: BEAM-12034
>                 URL: https://issues.apache.org/jira/browse/BEAM-12034
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core
>    Affects Versions: 2.28.0
>            Reporter: luke de feo
>            Priority: P2
>
> I would like to be able to write the following SQL
>  
> {code:java}
> SELECT
>  ordersevt1.TimeStamp,
>  ordersevt1.OrderId,
>  ordersevt1.RestaurantId,
>  ordersevt1.Tenant,
>  "orderplaced" AS Topic
> FROM ordersevt1
> UNION ALL
> SELECT
>  ordersevt2.TimeStamp,
>  ordersevt2.OrderId,
>  cast(ordersevt2.Restaurant.Id as INT64) AS RestaurantId,
>  ordersevt2.Tenant,
>  "ordercreated" AS Topic
> FROM ordersevt2{code}
>  
> Use case is I have 2 Pcollections which have similar data stored in a slightly different structure. I want to write a step to transform each into a common structure.
>  
> These transformations are completely independant so i would expect a Series of ParDo steps to transform each source pcollection then  something like this
>  
> ```
> PCollectionList
>  .of(inputs)
>  .apply("flatten", Flatten.pCollections())
> ```
>  
> In reality when i run this code this code I get a cogroup by code followed by a group by key. 
>  
> The relevant code seemms to be in 
>  
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase
>  
> {code:java}
> // TODO: We may want to preaggregate the counts first using Group instead of calling CoGroup and
> // measuring the
> // iterable size. If on average there are duplicates in the input, this will be faster.
> final String lhsTag = "lhs";
> final String rhsTag = "rhs";
> PCollection<Row> joined =
>  PCollectionTuple.of(lhsTag, leftRows, rhsTag, rightRows)
>  .apply("CoGroup", CoGroup.join(By.fieldNames("*")));
> return joined
>  .apply(
>  "FilterResults",
>  ParDo.of(
>  new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(
>  lhsTag, rhsTag, opType, all)))
>  .setRowSchema(joined.getSchema().getField("key").getType().getRowSchema());{code}
>  
> Is there a reason we are always doing a co group by key, all the extra shuffle seems wasteful 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)