You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/04/01 19:16:00 UTC

[jira] [Updated] (BEAM-12034) beam sql union of 2 unbounded streams always performs CGBK

     [ https://issues.apache.org/jira/browse/BEAM-12034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Kenneth Knowles updated BEAM-12034:
-----------------------------------
    Status: Open  (was: Triage Needed)

> beam sql union of 2 unbounded streams always performs CGBK
> ----------------------------------------------------------
>
>                 Key: BEAM-12034
>                 URL: https://issues.apache.org/jira/browse/BEAM-12034
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.28.0
>            Reporter: luke de feo
>            Priority: P2
>
> I would like to be able to write the following SQL
>  
> {code:java}
> SELECT
>  ordersevt1.TimeStamp,
>  ordersevt1.OrderId,
>  ordersevt1.RestaurantId,
>  ordersevt1.Tenant,
>  "orderplaced" AS Topic
> FROM ordersevt1
> UNION ALL
> SELECT
>  ordersevt2.TimeStamp,
>  ordersevt2.OrderId,
>  cast(ordersevt2.Restaurant.Id as INT64) AS RestaurantId,
>  ordersevt2.Tenant,
>  "ordercreated" AS Topic
> FROM ordersevt2{code}
>  
> Use case is I have 2 Pcollections which have similar data stored in a slightly different structure. I want to write a step to transform each into a common structure.
>  
> These transformations are completely independant so i would expect a Series of ParDo steps to transform each source pcollection then  something like this
>  
> ```
> PCollectionList
>  .of(inputs)
>  .apply("flatten", Flatten.pCollections())
> ```
>  
> In reality when i run this code this code I get a cogroup by code followed by a group by key. 
>  
> The relevant code seemms to be in 
>  
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase
>  
> {code:java}
> // TODO: We may want to preaggregate the counts first using Group instead of calling CoGroup and
> // measuring the
> // iterable size. If on average there are duplicates in the input, this will be faster.
> final String lhsTag = "lhs";
> final String rhsTag = "rhs";
> PCollection<Row> joined =
>  PCollectionTuple.of(lhsTag, leftRows, rhsTag, rightRows)
>  .apply("CoGroup", CoGroup.join(By.fieldNames("*")));
> return joined
>  .apply(
>  "FilterResults",
>  ParDo.of(
>  new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(
>  lhsTag, rhsTag, opType, all)))
>  .setRowSchema(joined.getSchema().getField("key").getType().getRowSchema());{code}
>  
> Is there a reason we are always doing a co group by key, all the extra shuffle seems wasteful 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)