You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Kenneth Knowles (Jira)" <ji...@apache.org> on 2021/04/01 19:16:00 UTC
[jira] [Updated] (BEAM-12034) beam sql union of 2 unbounded streams
always performs CGBK
[ https://issues.apache.org/jira/browse/BEAM-12034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles updated BEAM-12034:
-----------------------------------
Status: Open (was: Triage Needed)
> beam sql union of 2 unbounded streams always performs CGBK
> ----------------------------------------------------------
>
> Key: BEAM-12034
> URL: https://issues.apache.org/jira/browse/BEAM-12034
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.28.0
> Reporter: luke de feo
> Priority: P2
>
> I would like to be able to write the following SQL
>
> {code:java}
> SELECT
> ordersevt1.TimeStamp,
> ordersevt1.OrderId,
> ordersevt1.RestaurantId,
> ordersevt1.Tenant,
> "orderplaced" AS Topic
> FROM ordersevt1
> UNION ALL
> SELECT
> ordersevt2.TimeStamp,
> ordersevt2.OrderId,
> cast(ordersevt2.Restaurant.Id as INT64) AS RestaurantId,
> ordersevt2.Tenant,
> "ordercreated" AS Topic
> FROM ordersevt2{code}
>
> Use case is I have 2 Pcollections which have similar data stored in a slightly different structure. I want to write a step to transform each into a common structure.
>
> These transformations are completely independant so i would expect a Series of ParDo steps to transform each source pcollection then something like this
>
> ```
> PCollectionList
> .of(inputs)
> .apply("flatten", Flatten.pCollections())
> ```
>
> In reality when i run this code this code I get a cogroup by code followed by a group by key.
>
> The relevant code seemms to be in
>
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSetOperatorRelBase
>
> {code:java}
> // TODO: We may want to preaggregate the counts first using Group instead of calling CoGroup and
> // measuring the
> // iterable size. If on average there are duplicates in the input, this will be faster.
> final String lhsTag = "lhs";
> final String rhsTag = "rhs";
> PCollection<Row> joined =
> PCollectionTuple.of(lhsTag, leftRows, rhsTag, rightRows)
> .apply("CoGroup", CoGroup.join(By.fieldNames("*")));
> return joined
> .apply(
> "FilterResults",
> ParDo.of(
> new BeamSetOperatorsTransforms.SetOperatorFilteringDoFn(
> lhsTag, rhsTag, opType, all)))
> .setRowSchema(joined.getSchema().getField("key").getType().getRowSchema());{code}
>
> Is there a reason we are always doing a co group by key, all the extra shuffle seems wasteful
--
This message was sent by Atlassian Jira
(v8.3.4#803005)