You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ingo Bürk <in...@ververica.com> on 2021/08/20 11:55:49 UTC

Projection pushdown for metadata columns

Hi everyone,

according to the SupportsReadableMetadata interface, the planner is
supposed to project required metadata columns prior to applying them:

> The planner will select required metadata columns (i.e. perform
projection push down) and will call applyReadableMetadata(List, DataType)
with a list of metadata keys.

However, from my experiments it seems that this is not true: regardless of
what columns I select from a table, #applyReadableMetadata always seems to
be called with all metadata declared in the schema of the table. Metadata
columns are also excluded from SupportsProjectionPushDown#applyProjection,
so the source cannot perform the projection either.

This is in Flink 1.13.2. Am I misreading the docs here or is this not
working as intended?


Best
Ingo

Re: Projection pushdown for metadata columns

Posted by Ingo Bürk <in...@ververica.com>.
Hi Till,

it actually seems to be two issues, one is a bug and one would be an
improvement (it never worked before). I'll split the tickets later. I'll
raise the bug one to Critical since I want to fix it for 1.14, but it was
also broken in 1.13 already, so it shouldn't be a blocker.


Best
Ingo

On Mon, Aug 23, 2021 at 11:43 AM Till Rohrmann <tr...@apache.org> wrote:

> Does this also affect Flink 1.14.0? If yes, do we want to fix this issue
> for the upcoming release? If yes, then please make this issue a blocker or
> at least critical.
>
> Cheers,
> Till
>
> On Mon, Aug 23, 2021 at 8:39 AM Ingo Bürk <in...@ververica.com> wrote:
>
> > Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for
> this.
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-23911
> >
> >
> > Best
> > Ingo
> >
> > On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <tw...@apache.org> wrote:
> >
> > > Hi everyone,
> > >
> > > this sounds definitely like a bug to me. Computing metadata might be
> > > very expensive and a connector might expose a long list of metadata
> > > keys. It was therefore intended to project the metadata if possible.
> I'm
> > > pretty sure that this worked before (at least when implementing
> > > SupportsProjectionPushDown). Maybe a bug was introduced when adding the
> > > Spec support.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 23.08.21 08:24, Ingo Bürk wrote:
> > > > Hi Jingsong,
> > > >
> > > > thanks for your answer. Even if the source implements
> > > > SupportsProjectionPushDown, #applyProjections will never be called
> with
> > > > projections for metadata columns. For example, I have the following
> > test:
> > > >
> > > > @Test
> > > > def test(): Unit = {
> > > >    val tableId = TestValuesTableFactory.registerData(Seq())
> > > >
> > > >    tEnv.createTemporaryTable("T",
> > TableDescriptor.forConnector("values")
> > > >      .schema(Schema.newBuilder()
> > > >        .column("f0", DataTypes.INT())
> > > >        .columnByMetadata("m1", DataTypes.STRING())
> > > >        .columnByMetadata("m2", DataTypes.STRING())
> > > >        .build())
> > > >      .option("data-id", tableId)
> > > >      .option("bounded", "true")
> > > >      .option("readable-metadata", "m1:STRING,m2:STRING")
> > > >      .build())
> > > >
> > > >    tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> > > > }
> > > >
> > > > Regardless of whether I select only f0 or f0 + m1,
> > #applyReadableMetadata
> > > > is always called with m1 + m2, and #applyProjections only ever sees
> f0.
> > > So
> > > > as far as I can tell, the source has no way of knowing which metadata
> > > > columns are actually needed (under the projection), it always has to
> > > > produce metadata for all metadata columns declared in the table's
> > schema.
> > > >
> > > > In PushProjectIntoTableSourceScanRule I also haven't yet found
> anything
> > > > that would suggest that metadata are first projected and only then
> > pushed
> > > > to the source. I think the correct behavior should be to call
> > > > #applyReadableMetadata only after they have been considered in the
> > > > projection.
> > > >
> > > >
> > > > Best
> > > > Ingo
> > > >
> > > >
> > > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <ji...@gmail.com>
> > > wrote:
> > > >
> > > >> Hi,
> > > >>
> > > >> I remember the projection only works with
> SupportsProjectionPushDown.
> > > >>
> > > >> You can take a look at
> > > >>
> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
> > > >>
> > > >> Will applyReadableMetadata again in the
> > > PushProjectIntoTableSourceScanRule.
> > > >>
> > > >> But there may be bug in
> > > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
> > > >>
> > > >> if (!usedMetadataNames.isEmpty()) {
> > > >>      sourceAbilitySpecs.add(new
> ReadingMetadataSpec(usedMetadataNames,
> > > >> newProducedType));
> > > >> }
> > > >>
> > > >> If there is no meta column left, we should apply again, We should
> tell
> > > >> the source that there is no meta column left after projection.
> > > >>
> > > >> Best,
> > > >> Jingsong
> > > >>
> > > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <in...@ververica.com>
> wrote:
> > > >>>
> > > >>> Hi everyone,
> > > >>>
> > > >>> according to the SupportsReadableMetadata interface, the planner is
> > > >>> supposed to project required metadata columns prior to applying
> them:
> > > >>>
> > > >>>> The planner will select required metadata columns (i.e. perform
> > > >>> projection push down) and will call applyReadableMetadata(List,
> > > DataType)
> > > >>> with a list of metadata keys.
> > > >>>
> > > >>> However, from my experiments it seems that this is not true:
> > regardless
> > > >> of
> > > >>> what columns I select from a table, #applyReadableMetadata always
> > seems
> > > >> to
> > > >>> be called with all metadata declared in the schema of the table.
> > > Metadata
> > > >>> columns are also excluded from
> > > >> SupportsProjectionPushDown#applyProjection,
> > > >>> so the source cannot perform the projection either.
> > > >>>
> > > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this
> not
> > > >>> working as intended?
> > > >>>
> > > >>>
> > > >>> Best
> > > >>> Ingo
> > > >>
> > > >>
> > > >>
> > > >> --
> > > >> Best, Jingsong Lee
> > > >>
> > > >
> > >
> > >
> >
>

Re: Projection pushdown for metadata columns

Posted by Till Rohrmann <tr...@apache.org>.
Does this also affect Flink 1.14.0? If yes, do we want to fix this issue
for the upcoming release? If yes, then please make this issue a blocker or
at least critical.

Cheers,
Till

On Mon, Aug 23, 2021 at 8:39 AM Ingo Bürk <in...@ververica.com> wrote:

> Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23911
>
>
> Best
> Ingo
>
> On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <tw...@apache.org> wrote:
>
> > Hi everyone,
> >
> > this sounds definitely like a bug to me. Computing metadata might be
> > very expensive and a connector might expose a long list of metadata
> > keys. It was therefore intended to project the metadata if possible. I'm
> > pretty sure that this worked before (at least when implementing
> > SupportsProjectionPushDown). Maybe a bug was introduced when adding the
> > Spec support.
> >
> > Regards,
> > Timo
> >
> >
> > On 23.08.21 08:24, Ingo Bürk wrote:
> > > Hi Jingsong,
> > >
> > > thanks for your answer. Even if the source implements
> > > SupportsProjectionPushDown, #applyProjections will never be called with
> > > projections for metadata columns. For example, I have the following
> test:
> > >
> > > @Test
> > > def test(): Unit = {
> > >    val tableId = TestValuesTableFactory.registerData(Seq())
> > >
> > >    tEnv.createTemporaryTable("T",
> TableDescriptor.forConnector("values")
> > >      .schema(Schema.newBuilder()
> > >        .column("f0", DataTypes.INT())
> > >        .columnByMetadata("m1", DataTypes.STRING())
> > >        .columnByMetadata("m2", DataTypes.STRING())
> > >        .build())
> > >      .option("data-id", tableId)
> > >      .option("bounded", "true")
> > >      .option("readable-metadata", "m1:STRING,m2:STRING")
> > >      .build())
> > >
> > >    tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> > > }
> > >
> > > Regardless of whether I select only f0 or f0 + m1,
> #applyReadableMetadata
> > > is always called with m1 + m2, and #applyProjections only ever sees f0.
> > So
> > > as far as I can tell, the source has no way of knowing which metadata
> > > columns are actually needed (under the projection), it always has to
> > > produce metadata for all metadata columns declared in the table's
> schema.
> > >
> > > In PushProjectIntoTableSourceScanRule I also haven't yet found anything
> > > that would suggest that metadata are first projected and only then
> pushed
> > > to the source. I think the correct behavior should be to call
> > > #applyReadableMetadata only after they have been considered in the
> > > projection.
> > >
> > >
> > > Best
> > > Ingo
> > >
> > >
> > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <ji...@gmail.com>
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I remember the projection only works with SupportsProjectionPushDown.
> > >>
> > >> You can take a look at
> > >> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
> > >>
> > >> Will applyReadableMetadata again in the
> > PushProjectIntoTableSourceScanRule.
> > >>
> > >> But there may be bug in
> > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
> > >>
> > >> if (!usedMetadataNames.isEmpty()) {
> > >>      sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
> > >> newProducedType));
> > >> }
> > >>
> > >> If there is no meta column left, we should apply again, We should tell
> > >> the source that there is no meta column left after projection.
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <in...@ververica.com> wrote:
> > >>>
> > >>> Hi everyone,
> > >>>
> > >>> according to the SupportsReadableMetadata interface, the planner is
> > >>> supposed to project required metadata columns prior to applying them:
> > >>>
> > >>>> The planner will select required metadata columns (i.e. perform
> > >>> projection push down) and will call applyReadableMetadata(List,
> > DataType)
> > >>> with a list of metadata keys.
> > >>>
> > >>> However, from my experiments it seems that this is not true:
> regardless
> > >> of
> > >>> what columns I select from a table, #applyReadableMetadata always
> seems
> > >> to
> > >>> be called with all metadata declared in the schema of the table.
> > Metadata
> > >>> columns are also excluded from
> > >> SupportsProjectionPushDown#applyProjection,
> > >>> so the source cannot perform the projection either.
> > >>>
> > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this not
> > >>> working as intended?
> > >>>
> > >>>
> > >>> Best
> > >>> Ingo
> > >>
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
> >
>

Re: Projection pushdown for metadata columns

Posted by Ingo Bürk <in...@ververica.com>.
Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for this.

[1] https://issues.apache.org/jira/browse/FLINK-23911


Best
Ingo

On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <tw...@apache.org> wrote:

> Hi everyone,
>
> this sounds definitely like a bug to me. Computing metadata might be
> very expensive and a connector might expose a long list of metadata
> keys. It was therefore intended to project the metadata if possible. I'm
> pretty sure that this worked before (at least when implementing
> SupportsProjectionPushDown). Maybe a bug was introduced when adding the
> Spec support.
>
> Regards,
> Timo
>
>
> On 23.08.21 08:24, Ingo Bürk wrote:
> > Hi Jingsong,
> >
> > thanks for your answer. Even if the source implements
> > SupportsProjectionPushDown, #applyProjections will never be called with
> > projections for metadata columns. For example, I have the following test:
> >
> > @Test
> > def test(): Unit = {
> >    val tableId = TestValuesTableFactory.registerData(Seq())
> >
> >    tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
> >      .schema(Schema.newBuilder()
> >        .column("f0", DataTypes.INT())
> >        .columnByMetadata("m1", DataTypes.STRING())
> >        .columnByMetadata("m2", DataTypes.STRING())
> >        .build())
> >      .option("data-id", tableId)
> >      .option("bounded", "true")
> >      .option("readable-metadata", "m1:STRING,m2:STRING")
> >      .build())
> >
> >    tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> > }
> >
> > Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata
> > is always called with m1 + m2, and #applyProjections only ever sees f0.
> So
> > as far as I can tell, the source has no way of knowing which metadata
> > columns are actually needed (under the projection), it always has to
> > produce metadata for all metadata columns declared in the table's schema.
> >
> > In PushProjectIntoTableSourceScanRule I also haven't yet found anything
> > that would suggest that metadata are first projected and only then pushed
> > to the source. I think the correct behavior should be to call
> > #applyReadableMetadata only after they have been considered in the
> > projection.
> >
> >
> > Best
> > Ingo
> >
> >
> > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <ji...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I remember the projection only works with SupportsProjectionPushDown.
> >>
> >> You can take a look at
> >> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
> >>
> >> Will applyReadableMetadata again in the
> PushProjectIntoTableSourceScanRule.
> >>
> >> But there may be bug in
> >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
> >>
> >> if (!usedMetadataNames.isEmpty()) {
> >>      sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
> >> newProducedType));
> >> }
> >>
> >> If there is no meta column left, we should apply again, We should tell
> >> the source that there is no meta column left after projection.
> >>
> >> Best,
> >> Jingsong
> >>
> >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <in...@ververica.com> wrote:
> >>>
> >>> Hi everyone,
> >>>
> >>> according to the SupportsReadableMetadata interface, the planner is
> >>> supposed to project required metadata columns prior to applying them:
> >>>
> >>>> The planner will select required metadata columns (i.e. perform
> >>> projection push down) and will call applyReadableMetadata(List,
> DataType)
> >>> with a list of metadata keys.
> >>>
> >>> However, from my experiments it seems that this is not true: regardless
> >> of
> >>> what columns I select from a table, #applyReadableMetadata always seems
> >> to
> >>> be called with all metadata declared in the schema of the table.
> Metadata
> >>> columns are also excluded from
> >> SupportsProjectionPushDown#applyProjection,
> >>> so the source cannot perform the projection either.
> >>>
> >>> This is in Flink 1.13.2. Am I misreading the docs here or is this not
> >>> working as intended?
> >>>
> >>>
> >>> Best
> >>> Ingo
> >>
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
>
>

Re: Projection pushdown for metadata columns

Posted by Timo Walther <tw...@apache.org>.
Hi everyone,

this sounds definitely like a bug to me. Computing metadata might be 
very expensive and a connector might expose a long list of metadata 
keys. It was therefore intended to project the metadata if possible. I'm 
pretty sure that this worked before (at least when implementing 
SupportsProjectionPushDown). Maybe a bug was introduced when adding the 
Spec support.

Regards,
Timo


On 23.08.21 08:24, Ingo Bürk wrote:
> Hi Jingsong,
> 
> thanks for your answer. Even if the source implements
> SupportsProjectionPushDown, #applyProjections will never be called with
> projections for metadata columns. For example, I have the following test:
> 
> @Test
> def test(): Unit = {
>    val tableId = TestValuesTableFactory.registerData(Seq())
> 
>    tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
>      .schema(Schema.newBuilder()
>        .column("f0", DataTypes.INT())
>        .columnByMetadata("m1", DataTypes.STRING())
>        .columnByMetadata("m2", DataTypes.STRING())
>        .build())
>      .option("data-id", tableId)
>      .option("bounded", "true")
>      .option("readable-metadata", "m1:STRING,m2:STRING")
>      .build())
> 
>    tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> }
> 
> Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata
> is always called with m1 + m2, and #applyProjections only ever sees f0. So
> as far as I can tell, the source has no way of knowing which metadata
> columns are actually needed (under the projection), it always has to
> produce metadata for all metadata columns declared in the table's schema.
> 
> In PushProjectIntoTableSourceScanRule I also haven't yet found anything
> that would suggest that metadata are first projected and only then pushed
> to the source. I think the correct behavior should be to call
> #applyReadableMetadata only after they have been considered in the
> projection.
> 
> 
> Best
> Ingo
> 
> 
> On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <ji...@gmail.com> wrote:
> 
>> Hi,
>>
>> I remember the projection only works with SupportsProjectionPushDown.
>>
>> You can take a look at
>> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
>>
>> Will applyReadableMetadata again in the PushProjectIntoTableSourceScanRule.
>>
>> But there may be bug in
>> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
>>
>> if (!usedMetadataNames.isEmpty()) {
>>      sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
>> newProducedType));
>> }
>>
>> If there is no meta column left, we should apply again, We should tell
>> the source that there is no meta column left after projection.
>>
>> Best,
>> Jingsong
>>
>> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <in...@ververica.com> wrote:
>>>
>>> Hi everyone,
>>>
>>> according to the SupportsReadableMetadata interface, the planner is
>>> supposed to project required metadata columns prior to applying them:
>>>
>>>> The planner will select required metadata columns (i.e. perform
>>> projection push down) and will call applyReadableMetadata(List, DataType)
>>> with a list of metadata keys.
>>>
>>> However, from my experiments it seems that this is not true: regardless
>> of
>>> what columns I select from a table, #applyReadableMetadata always seems
>> to
>>> be called with all metadata declared in the schema of the table. Metadata
>>> columns are also excluded from
>> SupportsProjectionPushDown#applyProjection,
>>> so the source cannot perform the projection either.
>>>
>>> This is in Flink 1.13.2. Am I misreading the docs here or is this not
>>> working as intended?
>>>
>>>
>>> Best
>>> Ingo
>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
> 


Re: Projection pushdown for metadata columns

Posted by Ingo Bürk <in...@ververica.com>.
Hi Jingsong,

thanks for your answer. Even if the source implements
SupportsProjectionPushDown, #applyProjections will never be called with
projections for metadata columns. For example, I have the following test:

@Test
def test(): Unit = {
  val tableId = TestValuesTableFactory.registerData(Seq())

  tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.INT())
      .columnByMetadata("m1", DataTypes.STRING())
      .columnByMetadata("m2", DataTypes.STRING())
      .build())
    .option("data-id", tableId)
    .option("bounded", "true")
    .option("readable-metadata", "m1:STRING,m2:STRING")
    .build())

  tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}

Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata
is always called with m1 + m2, and #applyProjections only ever sees f0. So
as far as I can tell, the source has no way of knowing which metadata
columns are actually needed (under the projection), it always has to
produce metadata for all metadata columns declared in the table's schema.

In PushProjectIntoTableSourceScanRule I also haven't yet found anything
that would suggest that metadata are first projected and only then pushed
to the source. I think the correct behavior should be to call
#applyReadableMetadata only after they have been considered in the
projection.


Best
Ingo


On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <ji...@gmail.com> wrote:

> Hi,
>
> I remember the projection only works with SupportsProjectionPushDown.
>
> You can take a look at
> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
>
> Will applyReadableMetadata again in the PushProjectIntoTableSourceScanRule.
>
> But there may be bug in
> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
>
> if (!usedMetadataNames.isEmpty()) {
>     sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
> newProducedType));
> }
>
> If there is no meta column left, we should apply again, We should tell
> the source that there is no meta column left after projection.
>
> Best,
> Jingsong
>
> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <in...@ververica.com> wrote:
> >
> > Hi everyone,
> >
> > according to the SupportsReadableMetadata interface, the planner is
> > supposed to project required metadata columns prior to applying them:
> >
> > > The planner will select required metadata columns (i.e. perform
> > projection push down) and will call applyReadableMetadata(List, DataType)
> > with a list of metadata keys.
> >
> > However, from my experiments it seems that this is not true: regardless
> of
> > what columns I select from a table, #applyReadableMetadata always seems
> to
> > be called with all metadata declared in the schema of the table. Metadata
> > columns are also excluded from
> SupportsProjectionPushDown#applyProjection,
> > so the source cannot perform the projection either.
> >
> > This is in Flink 1.13.2. Am I misreading the docs here or is this not
> > working as intended?
> >
> >
> > Best
> > Ingo
>
>
>
> --
> Best, Jingsong Lee
>

Re: Projection pushdown for metadata columns

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

I remember the projection only works with SupportsProjectionPushDown.

You can take a look at
`PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.

Will applyReadableMetadata again in the PushProjectIntoTableSourceScanRule.

But there may be bug in
PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:

if (!usedMetadataNames.isEmpty()) {
    sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
newProducedType));
}

If there is no meta column left, we should apply again, We should tell
the source that there is no meta column left after projection.

Best,
Jingsong

On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <in...@ververica.com> wrote:
>
> Hi everyone,
>
> according to the SupportsReadableMetadata interface, the planner is
> supposed to project required metadata columns prior to applying them:
>
> > The planner will select required metadata columns (i.e. perform
> projection push down) and will call applyReadableMetadata(List, DataType)
> with a list of metadata keys.
>
> However, from my experiments it seems that this is not true: regardless of
> what columns I select from a table, #applyReadableMetadata always seems to
> be called with all metadata declared in the schema of the table. Metadata
> columns are also excluded from SupportsProjectionPushDown#applyProjection,
> so the source cannot perform the projection either.
>
> This is in Flink 1.13.2. Am I misreading the docs here or is this not
> working as intended?
>
>
> Best
> Ingo



-- 
Best, Jingsong Lee