You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rex Fenley <Re...@remind101.com> on 2020/12/03 18:55:10 UTC

Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same
exact join operator multiple times simply because the subsequent operator
filters on a different boolean value. This is a massive duplication of
storage and work. The filtered operators which follow result in only a
small set of elements filtered out per set too.

eg. of two separate operators that are equal

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent]

Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
organization_id, user_id, roles, id_splatted, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent],
leftInputSpec=[JoinKeyContainsUniqueKey],
rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
org_user_is_teacher, org_user_is_student, org_user_is_parent])

Which are entirely the same datasets being processed.

The first one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS admin_organization_ids])

The second one points to
GroupAggregate(groupBy=[user_id], select=[user_id,
IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
TMP_0.f0 AS teacher_organization_ids])

And these are both intersecting sets of data though slightly different. I
don't see why that would make the 1 join from before split into 2 though.
There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink
to not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Re: Duplicate operators generated by plan

Posted by Rex Fenley <Re...@remind101.com>.
Version 1.11.2

On Sun, Dec 6, 2020 at 10:20 PM Yun Gao <yu...@aliyun.com> wrote:

> Hi Rex,
>
>    I tried a similar example[1] but did not reproduce the issue, which
> version of Flink you are using now ?
>
> Best,
>  Yun
>
>
>
>
> [1] The example code:
>
> StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> bsEnv.setRestartStrategy(RestartStrategies.noRestart());
> bsEnv.setParallelism(1);
>
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
>
> DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, String>>() {
>
>    @Override
>    public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception {
>       sourceContext.collect(new Tuple2<>(0, "test"));
>    }
>
>    @Override
>    public void cancel() {
>
>    }
> });
>
> Table table = bsTableEnv.fromDataStream(
>       source, $("id"), $("name"));
> Table table2 = table.select(call("abs", $("id")), $("name"))
>       .as("new_id", "new_name");
>
> bsTableEnv.createTemporaryView("view", table2);
> Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as new_name from view group by new_id");
>
> Table ret = table.join(handled)
>       .where($("id").isEqual($("new_id")))
>       .select($("id"), $("name"), $("new_name"));
> System.out.println(ret.explain());
>
> DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, Row.class);
> row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
>    @Override
>    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
>
>    }
> });
>
> System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
>
> ------------------------------------------------------------------
> Sender:Rex Fenley<Re...@remind101.com>
> Date:2020/12/04 14:18:21
> Recipient:Yun Gao<yu...@aliyun.com>
> Cc:user<us...@flink.apache.org>; Brad Davis<br...@remind101.com>
> Theme:Re: Duplicate operators generated by plan
>
> cc Brad
>
> On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <Re...@remind101.com> wrote:
>
>> Yes, the same exact input operators go into both joins.
>>
>> The chunk of code for the joins from the specific part of the plan I
>> showed is as follows. The orgUsersTable is later filtered into one table
>> and aggregated and another table and aggregated. The planner seems to
>> duplicate orgUsersTable into 2 operators even though I create only 1 of it.
>>
>> // in the main function
>> val orgUsersTable = splatRoles(
>> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
>> OrgUsersRoleSplatPrefix,
>> this.tableEnv
>> )
>>
>> // helper function
>> def splatRoles(
>> table: Table,
>> columnPrefix: String,
>> tableEnv: TableEnvironment
>> ): Table = {
>> // Flink does not have a contains function so we have to splat out our
>> role array's contents
>> // and join it to the originating table.
>> val func = new SplatRolesFunc()
>> val splatted = table
>> .map(func($"roles", $"id"))
>> .as(
>> "id_splatted",
>> s"${columnPrefix}_is_admin",
>> s"${columnPrefix}_is_teacher",
>> s"${columnPrefix}_is_student",
>> s"${columnPrefix}_is_parent"
>> )
>> // FIRST_VALUE is only available in SQL - so this is SQL.
>> // Rationale: We have to group by after a map to preserve the pk
>> inference, otherwise flink will
>> // toss it out and all future joins will not have a unique key.
>> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
>> val grouped = tableEnv.sqlQuery(s"""
>> SELECT
>> id_splatted,
>> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
>> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
>> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
>> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
>> FROM ${columnPrefix}_splatted
>> GROUP BY id_splatted
>> """)
>> return table
>> .join(grouped, $"id" === $"id_splatted")
>> .dropColumns($"id_splatted")
>> .renameColumns($"roles".as(s"${columnPrefix}_roles"))
>> }
>>
>> @FunctionHint(
>> output = new DataTypeHint(
>> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
>> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
>> )
>> )
>> class SplatRolesFunc extends ScalarFunction {
>> def eval(roles: Array[String], id: java.lang.Long): Row = {
>> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
>> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
>> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
>> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
>> return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
>> }
>>
>> override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] =
>> Types.ROW(
>> Types.LONG,
>> Types.BOOLEAN,
>> Types.BOOLEAN,
>> Types.BOOLEAN,
>> Types.BOOLEAN
>> )
>> }
>>
>>
>> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yu...@aliyun.com> wrote:
>>
>>> Hi Rex,
>>>
>>>     Could  you also attach one example for these sql / table ? And one
>>> possible issue to confirm is that does the operators with the same names
>>> also have the same inputs ?
>>>
>>> Best,
>>> Yun
>>>
>>> ------------------Original Mail ------------------
>>> *Sender:*Rex Fenley <Re...@remind101.com>
>>> *Send Date:*Fri Dec 4 02:55:41 2020
>>> *Recipients:*user <us...@flink.apache.org>
>>> *Subject:*Duplicate operators generated by plan
>>>
>>>> Hello,
>>>>
>>>> I'm running into an issue where my execution plan is creating the same
>>>> exact join operator multiple times simply because the subsequent operator
>>>> filters on a different boolean value. This is a massive duplication of
>>>> storage and work. The filtered operators which follow result in only a
>>>> small set of elements filtered out per set too.
>>>>
>>>> eg. of two separate operators that are equal
>>>>
>>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>>>
>>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>>>
>>>> Which are entirely the same datasets being processed.
>>>>
>>>> The first one points to
>>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>>> TMP_0.f0 AS admin_organization_ids])
>>>>
>>>> The second one points to
>>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>>> TMP_0.f0 AS teacher_organization_ids])
>>>>
>>>> And these are both intersecting sets of data though slightly different.
>>>> I don't see why that would make the 1 join from before split into 2 though.
>>>> There's even a case where I'm seeing a join tripled.
>>>>
>>>> Is there a good reason why this should happen? Is there a way to tell
>>>> flink to not duplicate operators where it doesn't need to?
>>>>
>>>> Thanks!
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Re: Duplicate operators generated by plan

Posted by Yun Gao <yu...@aliyun.com>.
Hi Rex,

   I tried a similar example[1] but did not reproduce the issue, which version of Flink you are using now ?

Best,
 Yun


[1] The example code:
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.setRestartStrategy(RestartStrategies.noRestart());
bsEnv.setParallelism(1);

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

DataStream<Tuple2<Integer, String>> source = bsEnv.addSource(new RichParallelSourceFunction<Tuple2<Integer, String>>() {

   @Override
   public void run(SourceContext<Tuple2<Integer, String>> sourceContext) throws Exception {
      sourceContext.collect(new Tuple2<>(0, "test"));
   }

   @Override
   public void cancel() {

   }
});

Table table = bsTableEnv.fromDataStream(
      source, $("id"), $("name"));
Table table2 = table.select(call("abs", $("id")), $("name"))
      .as("new_id", "new_name");

bsTableEnv.createTemporaryView("view", table2);
Table handled = bsTableEnv.sqlQuery("select new_id, FIRST_VALUE(new_name) as new_name from view group by new_id");

Table ret = table.join(handled)
      .where($("id").isEqual($("new_id")))
      .select($("id"), $("name"), $("new_name"));
System.out.println(ret.explain());

DataStream<Tuple2<Boolean, Row>> row = bsTableEnv.toRetractStream(ret, Row.class);
row.addSink(new SinkFunction<Tuple2<Boolean, Row>>() {
   @Override
   public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {

   }
});

System.out.println(bsEnv.getStreamGraph().getStreamingPlanAsJSON());
------------------------------------------------------------------
Sender:Rex Fenley<Re...@remind101.com>
Date:2020/12/04 14:18:21
Recipient:Yun Gao<yu...@aliyun.com>
Cc:user<us...@flink.apache.org>; Brad Davis<br...@remind101.com>
Theme:Re: Duplicate operators generated by plan

cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <Re...@remind101.com> wrote:

Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed is as follows. The orgUsersTable is later filtered into one table and aggregated and another table and aggregated. The planner seems to duplicate orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
 this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
 OrgUsersRoleSplatPrefix,
 this.tableEnv
 )

// helper function
 def splatRoles(
 table: Table,
 columnPrefix: String,
 tableEnv: TableEnvironment
 ): Table = {
 // Flink does not have a contains function so we have to splat out our role array's contents
 // and join it to the originating table.
 val func = new SplatRolesFunc()
 val splatted = table
 .map(func($"roles", $"id"))
 .as(
 "id_splatted",
 s"${columnPrefix}_is_admin",
 s"${columnPrefix}_is_teacher",
 s"${columnPrefix}_is_student",
 s"${columnPrefix}_is_parent"
 )
 // FIRST_VALUE is only available in SQL - so this is SQL.
 // Rationale: We have to group by after a map to preserve the pk inference, otherwise flink will
 // toss it out and all future joins will not have a unique key.
 tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
 val grouped = tableEnv.sqlQuery(s"""
 SELECT
 id_splatted,
 FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
 FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
 FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
 FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
 FROM ${columnPrefix}_splatted
 GROUP BY id_splatted
 """)
 return table
 .join(grouped, $"id" === $"id_splatted")
 .dropColumns($"id_splatted")
 .renameColumns($"roles".as(s"${columnPrefix}_roles"))
 }

@FunctionHint(
 output = new DataTypeHint(
 "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
 )
)
class SplatRolesFunc extends ScalarFunction {
 def eval(roles: Array[String], id: java.lang.Long): Row = {
 val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
 val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
 val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
 val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
 return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
 }
 override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
 Types.ROW(
 Types.LONG,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN,
 Types.BOOLEAN
 )
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yu...@aliyun.com> wrote:

Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun

 ------------------Original Mail ------------------
Sender:Rex Fenley <Re...@remind101.com>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <us...@flink.apache.org>
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]) 

Which are entirely the same datasets being processed.

The first one points to 
 GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids]) 

The second one points to
 GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids]) 

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US 

Re: Duplicate operators generated by plan

Posted by Rex Fenley <Re...@remind101.com>.
cc Brad

On Thu, Dec 3, 2020 at 10:17 PM Rex Fenley <Re...@remind101.com> wrote:

> Yes, the same exact input operators go into both joins.
>
> The chunk of code for the joins from the specific part of the plan I
> showed is as follows. The orgUsersTable is later filtered into one table
> and aggregated and another table and aggregated. The planner seems to
> duplicate orgUsersTable into 2 operators even though I create only 1 of it.
>
> // in the main function
> val orgUsersTable = splatRoles(
> this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
> OrgUsersRoleSplatPrefix,
> this.tableEnv
> )
>
> // helper function
> def splatRoles(
> table: Table,
> columnPrefix: String,
> tableEnv: TableEnvironment
> ): Table = {
> // Flink does not have a contains function so we have to splat out our
> role array's contents
> // and join it to the originating table.
> val func = new SplatRolesFunc()
> val splatted = table
> .map(func($"roles", $"id"))
> .as(
> "id_splatted",
> s"${columnPrefix}_is_admin",
> s"${columnPrefix}_is_teacher",
> s"${columnPrefix}_is_student",
> s"${columnPrefix}_is_parent"
> )
> // FIRST_VALUE is only available in SQL - so this is SQL.
> // Rationale: We have to group by after a map to preserve the pk
> inference, otherwise flink will
> // toss it out and all future joins will not have a unique key.
> tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
> val grouped = tableEnv.sqlQuery(s"""
> SELECT
> id_splatted,
> FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
> FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
> FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
> FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
> FROM ${columnPrefix}_splatted
> GROUP BY id_splatted
> """)
> return table
> .join(grouped, $"id" === $"id_splatted")
> .dropColumns($"id_splatted")
> .renameColumns($"roles".as(s"${columnPrefix}_roles"))
> }
>
> @FunctionHint(
> output = new DataTypeHint(
> "(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
> BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
> )
> )
> class SplatRolesFunc extends ScalarFunction {
> def eval(roles: Array[String], id: java.lang.Long): Row = {
> val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
> val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
> val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
> val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
> return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
> }
>
> override def getResultType(signature: Array[Class[_]]):
> TypeInformation[_] =
> Types.ROW(
> Types.LONG,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN,
> Types.BOOLEAN
> )
> }
>
>
> On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yu...@aliyun.com> wrote:
>
>> Hi Rex,
>>
>>     Could  you also attach one example for these sql / table ? And one
>> possible issue to confirm is that does the operators with the same names
>> also have the same inputs ?
>>
>> Best,
>> Yun
>>
>> ------------------Original Mail ------------------
>> *Sender:*Rex Fenley <Re...@remind101.com>
>> *Send Date:*Fri Dec 4 02:55:41 2020
>> *Recipients:*user <us...@flink.apache.org>
>> *Subject:*Duplicate operators generated by plan
>>
>>> Hello,
>>>
>>> I'm running into an issue where my execution plan is creating the same
>>> exact join operator multiple times simply because the subsequent operator
>>> filters on a different boolean value. This is a massive duplication of
>>> storage and work. The filtered operators which follow result in only a
>>> small set of elements filtered out per set too.
>>>
>>> eg. of two separate operators that are equal
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>>
>>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>>> leftInputSpec=[JoinKeyContainsUniqueKey],
>>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>>
>>> Which are entirely the same datasets being processed.
>>>
>>> The first one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>> TMP_0.f0 AS admin_organization_ids])
>>>
>>> The second one points to
>>> GroupAggregate(groupBy=[user_id], select=[user_id,
>>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>>> TMP_0.f0 AS teacher_organization_ids])
>>>
>>> And these are both intersecting sets of data though slightly different.
>>> I don't see why that would make the 1 join from before split into 2 though.
>>> There's even a case where I'm seeing a join tripled.
>>>
>>> Is there a good reason why this should happen? Is there a way to tell
>>> flink to not duplicate operators where it doesn't need to?
>>>
>>> Thanks!
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Duplicate operators generated by plan

Posted by Rex Fenley <Re...@remind101.com>.
Yes, the same exact input operators go into both joins.

The chunk of code for the joins from the specific part of the plan I showed
is as follows. The orgUsersTable is later filtered into one table and
aggregated and another table and aggregated. The planner seems to duplicate
orgUsersTable into 2 operators even though I create only 1 of it.

// in the main function
val orgUsersTable = splatRoles(
this.tableEnv.from(SOURCE_ORGANIZATIONS_USERS),
OrgUsersRoleSplatPrefix,
this.tableEnv
)

// helper function
def splatRoles(
table: Table,
columnPrefix: String,
tableEnv: TableEnvironment
): Table = {
// Flink does not have a contains function so we have to splat out our role
array's contents
// and join it to the originating table.
val func = new SplatRolesFunc()
val splatted = table
.map(func($"roles", $"id"))
.as(
"id_splatted",
s"${columnPrefix}_is_admin",
s"${columnPrefix}_is_teacher",
s"${columnPrefix}_is_student",
s"${columnPrefix}_is_parent"
)
// FIRST_VALUE is only available in SQL - so this is SQL.
// Rationale: We have to group by after a map to preserve the pk inference,
otherwise flink will
// toss it out and all future joins will not have a unique key.
tableEnv.createTemporaryView(s"${columnPrefix}_splatted", splatted)
val grouped = tableEnv.sqlQuery(s"""
SELECT
id_splatted,
FIRST_VALUE(${columnPrefix}_is_admin) AS ${columnPrefix}_is_admin,
FIRST_VALUE(${columnPrefix}_is_teacher) AS ${columnPrefix}_is_teacher,
FIRST_VALUE(${columnPrefix}_is_student) AS ${columnPrefix}_is_student,
FIRST_VALUE(${columnPrefix}_is_parent) AS ${columnPrefix}_is_parent
FROM ${columnPrefix}_splatted
GROUP BY id_splatted
""")
return table
.join(grouped, $"id" === $"id_splatted")
.dropColumns($"id_splatted")
.renameColumns($"roles".as(s"${columnPrefix}_roles"))
}

@FunctionHint(
output = new DataTypeHint(
"(id_splatted BIGINT, is_admin BOOLEAN, is_teacher BOOLEAN, is_student
BOOLEAN, is_parent BOOLEAN, PRIMARY KEY (id_splatted) NOT ENFORCED)"
)
)
class SplatRolesFunc extends ScalarFunction {
def eval(roles: Array[String], id: java.lang.Long): Row = {
val isAdmin: java.lang.Boolean = roles.contains(Admin.rawValue)
val isTeacher: java.lang.Boolean = roles.contains(Teacher.rawValue)
val isStudent: java.lang.Boolean = roles.contains(Student.rawValue)
val isParent: java.lang.Boolean = roles.contains(Parent.rawValue)
return Row.of(id, isAdmin, isTeacher, isStudent, isParent)
}

override def getResultType(signature: Array[Class[_]]): TypeInformation[_] =
Types.ROW(
Types.LONG,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN,
Types.BOOLEAN
)
}


On Thu, Dec 3, 2020 at 7:49 PM Yun Gao <yu...@aliyun.com> wrote:

> Hi Rex,
>
>     Could  you also attach one example for these sql / table ? And one
> possible issue to confirm is that does the operators with the same names
> also have the same inputs ?
>
> Best,
> Yun
>
> ------------------Original Mail ------------------
> *Sender:*Rex Fenley <Re...@remind101.com>
> *Send Date:*Fri Dec 4 02:55:41 2020
> *Recipients:*user <us...@flink.apache.org>
> *Subject:*Duplicate operators generated by plan
>
>> Hello,
>>
>> I'm running into an issue where my execution plan is creating the same
>> exact join operator multiple times simply because the subsequent operator
>> filters on a different boolean value. This is a massive duplication of
>> storage and work. The filtered operators which follow result in only a
>> small set of elements filtered out per set too.
>>
>> eg. of two separate operators that are equal
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent]
>>
>> Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id,
>> organization_id, user_id, roles, id_splatted, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent],
>> leftInputSpec=[JoinKeyContainsUniqueKey],
>> rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id,
>> organization_id, user_id, roles AS org_user_roles, org_user_is_admin,
>> org_user_is_teacher, org_user_is_student, org_user_is_parent])
>>
>> Which are entirely the same datasets being processed.
>>
>> The first one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS admin_organization_ids])
>>
>> The second one points to
>> GroupAggregate(groupBy=[user_id], select=[user_id,
>> IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id,
>> TMP_0.f0 AS teacher_organization_ids])
>>
>> And these are both intersecting sets of data though slightly different. I
>> don't see why that would make the 1 join from before split into 2 though.
>> There's even a case where I'm seeing a join tripled.
>>
>> Is there a good reason why this should happen? Is there a way to tell
>> flink to not duplicate operators where it doesn't need to?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Re: Duplicate operators generated by plan

Posted by Yun Gao <yu...@aliyun.com>.
Hi Rex,

    Could  you also attach one example for these sql / table ? And one possible issue to confirm is that does the operators with the same names also have the same inputs ?

Best,
Yun

 ------------------Original Mail ------------------
Sender:Rex Fenley <Re...@remind101.com>
Send Date:Fri Dec 4 02:55:41 2020
Recipients:user <us...@flink.apache.org>
Subject:Duplicate operators generated by plan

Hello,

I'm running into an issue where my execution plan is creating the same exact join operator multiple times simply because the subsequent operator filters on a different boolean value. This is a massive duplication of storage and work. The filtered operators which follow result in only a small set of elements filtered out per set too.

eg. of two separate operators that are equal

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]

 Join(joinType=[InnerJoin], where=[(id = id_splatted)], select=[id, organization_id, user_id, roles, id_splatted, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]) -> Calc(select=[id, organization_id, user_id, roles AS org_user_roles, org_user_is_admin, org_user_is_teacher, org_user_is_student, org_user_is_parent]) 

Which are entirely the same datasets being processed.

The first one points to 
 GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS admin_organization_ids]) 

The second one points to
 GroupAggregate(groupBy=[user_id], select=[user_id, IDsAgg_RETRACT(organization_id) AS TMP_0]) -> Calc(select=[user_id, TMP_0.f0 AS teacher_organization_ids]) 

And these are both intersecting sets of data though slightly different. I don't see why that would make the 1 join from before split into 2 though. There's even a case where I'm seeing a join tripled.

Is there a good reason why this should happen? Is there a way to tell flink to not duplicate operators where it doesn't need to?

Thanks!

-- 

Rex Fenley | Software Engineer - Mobile and Backend

Remind.com |  BLOG  |  FOLLOW US  |  LIKE US