You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andres Angel <in...@gmail.com> on 2019/07/25 00:15:03 UTC

LEFT JOIN issue SQL API

Hello guys I have registered some table environments and now I'm trying to
perform a query on these using LEFT JOIN like the example below:

 Table fullenrichment = tenv.sqlQuery(
                "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
                        " FROM t1 pp LEFT JOIN t2 ent" +
                        " ON pp.b = ent.b" +
                        " LEFT JOIN t3 act " +
                        " ON pp.a = act.a "
        );

Once the query is complete I need to read this into a Row DS

DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);

I'm getting the following error, however, if I execute the same code but
instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the
code works , why this behavior?

1930 [main] INFO
 org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  -
Flink Kinesis Consumer is going to read the following streams:
tr-stream-ingestion,
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
class org.apache.flink.types.Row does not contain a getter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
class org.apache.flink.types.Row does not contain a setter for field fields
3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
class org.apache.flink.types.Row does not contain a getter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
class org.apache.flink.types.Row does not contain a setter for field fields
3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
class org.apache.flink.types.Row does not contain a getter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
class org.apache.flink.types.Row does not contain a setter for field fields
3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
Class class org.apache.flink.types.Row cannot be used as a POJO type
because not all fields are valid POJO fields, and must be processed as
GenericType. Please read the Flink documentation on "Data Types &
Serialization" for details of the effect on performance.
Exception in thread "main" org.apache.flink.table.api.TableException: Table
is not an append-only table. Use the toRetractStream() in order to handle
add and retract messages.
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
at
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
at
org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
at consumer.trconsumer.main(trconsumer.java:180)

Re: LEFT JOIN issue SQL API

Posted by Fabian Hueske <fh...@gmail.com>.
If you need an outer join, the only solution is to convert the table into a
retraction stream and correctly handle the retraction messages.
Btw. even then this might not perform as you would like it to be.
The query will store all input tables completely in state. So you might run
out of space sooner or later if just one of the tables is an append only
stream.

You need to add temporal constraints join constraints to ensure that the
query can release state that will never be joined with any future rows.
I'd recommend to have a look at this page [1].

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/streaming/joins.html

Am Do., 25. Juli 2019 um 13:54 Uhr schrieb Andres Angel <
ingenieroandresangel@gmail.com>:

> Thanks so much for your answer , but then how should I perform such as
> comparison ? Which options do we have ?
> Thanks
>
> Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li <le...@gmail.com> a
> écrit :
>
>> Hi, it's because the Outer Joins will generate retractions, consider the
>> behavior of Left Outer Join
>>
>> 1.  left record arrives, no matched right record, so  +(left, null) will
>> be generated.
>> 2  right record arrives, the previous result should be retracted, so
>> -(left, null) and +(left, right) will be generated
>>
>> Andres Angel <in...@gmail.com> 于2019年7月25日周四 上午8:15写道:
>>
>>> Hello guys I have registered some table environments and now I'm trying
>>> to perform a query on these using LEFT JOIN like the example below:
>>>
>>>  Table fullenrichment = tenv.sqlQuery(
>>>                 "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
>>>                         " FROM t1 pp LEFT JOIN t2 ent" +
>>>                         " ON pp.b = ent.b" +
>>>                         " LEFT JOIN t3 act " +
>>>                         " ON pp.a = act.a "
>>>         );
>>>
>>> Once the query is complete I need to read this into a Row DS
>>>
>>> DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);
>>>
>>> I'm getting the following error, however, if I execute the same code but
>>> instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the
>>> code works , why this behavior?
>>>
>>> 1930 [main] INFO
>>>  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  -
>>> Flink Kinesis Consumer is going to read the following streams:
>>> tr-stream-ingestion,
>>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> class org.apache.flink.types.Row does not contain a getter for field fields
>>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> class org.apache.flink.types.Row does not contain a setter for field fields
>>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>>> because not all fields are valid POJO fields, and must be processed as
>>> GenericType. Please read the Flink documentation on "Data Types &
>>> Serialization" for details of the effect on performance.
>>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> class org.apache.flink.types.Row does not contain a getter for field fields
>>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> class org.apache.flink.types.Row does not contain a setter for field fields
>>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>>> because not all fields are valid POJO fields, and must be processed as
>>> GenericType. Please read the Flink documentation on "Data Types &
>>> Serialization" for details of the effect on performance.
>>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> class org.apache.flink.types.Row does not contain a getter for field fields
>>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> class org.apache.flink.types.Row does not contain a setter for field fields
>>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>>> because not all fields are valid POJO fields, and must be processed as
>>> GenericType. Please read the Flink documentation on "Data Types &
>>> Serialization" for details of the effect on performance.
>>> Exception in thread "main" org.apache.flink.table.api.TableException:
>>> Table is not an append-only table. Use the toRetractStream() in order to
>>> handle add and retract messages.
>>> at
>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
>>> at
>>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
>>> at
>>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
>>> at
>>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
>>> at consumer.trconsumer.main(trconsumer.java:180)
>>>
>>

Re: LEFT JOIN issue SQL API

Posted by Andres Angel <in...@gmail.com>.
Thanks so much for your answer , but then how should I perform such as
comparison ? Which options do we have ?
Thanks

Le mer. 24 juill. 2019 10:01 p.m., Ruidong Li <le...@gmail.com> a
écrit :

> Hi, it's because the Outer Joins will generate retractions, consider the
> behavior of Left Outer Join
>
> 1.  left record arrives, no matched right record, so  +(left, null) will
> be generated.
> 2  right record arrives, the previous result should be retracted, so
> -(left, null) and +(left, right) will be generated
>
> Andres Angel <in...@gmail.com> 于2019年7月25日周四 上午8:15写道:
>
>> Hello guys I have registered some table environments and now I'm trying
>> to perform a query on these using LEFT JOIN like the example below:
>>
>>  Table fullenrichment = tenv.sqlQuery(
>>                 "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
>>                         " FROM t1 pp LEFT JOIN t2 ent" +
>>                         " ON pp.b = ent.b" +
>>                         " LEFT JOIN t3 act " +
>>                         " ON pp.a = act.a "
>>         );
>>
>> Once the query is complete I need to read this into a Row DS
>>
>> DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);
>>
>> I'm getting the following error, however, if I execute the same code but
>> instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the
>> code works , why this behavior?
>>
>> 1930 [main] INFO
>>  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  -
>> Flink Kinesis Consumer is going to read the following streams:
>> tr-stream-ingestion,
>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a getter for field fields
>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a setter for field fields
>> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>> because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types &
>> Serialization" for details of the effect on performance.
>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a getter for field fields
>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a setter for field fields
>> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>> because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types &
>> Serialization" for details of the effect on performance.
>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a getter for field fields
>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> class org.apache.flink.types.Row does not contain a setter for field fields
>> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
>> Class class org.apache.flink.types.Row cannot be used as a POJO type
>> because not all fields are valid POJO fields, and must be processed as
>> GenericType. Please read the Flink documentation on "Data Types &
>> Serialization" for details of the effect on performance.
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Table is not an append-only table. Use the toRetractStream() in order to
>> handle add and retract messages.
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
>> at
>> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
>> at consumer.trconsumer.main(trconsumer.java:180)
>>
>

Re: LEFT JOIN issue SQL API

Posted by Ruidong Li <le...@gmail.com>.
Hi, it's because the Outer Joins will generate retractions, consider the
behavior of Left Outer Join

1.  left record arrives, no matched right record, so  +(left, null) will be
generated.
2  right record arrives, the previous result should be retracted, so
-(left, null) and +(left, right) will be generated

Andres Angel <in...@gmail.com> 于2019年7月25日周四 上午8:15写道:

> Hello guys I have registered some table environments and now I'm trying to
> perform a query on these using LEFT JOIN like the example below:
>
>  Table fullenrichment = tenv.sqlQuery(
>                 "SELECT   pp.a,pp.b,pp.c,pp.d,pp.a " +
>                         " FROM t1 pp LEFT JOIN t2 ent" +
>                         " ON pp.b = ent.b" +
>                         " LEFT JOIN t3 act " +
>                         " ON pp.a = act.a "
>         );
>
> Once the query is complete I need to read this into a Row DS
>
> DS<Row> results = tenv.toAppendStream(fullenrichment,Row.class);
>
> I'm getting the following error, however, if I execute the same code but
> instead that LEFT JOIN I switch by INNER JOIN the error is vanished and the
> code works , why this behavior?
>
> 1930 [main] INFO
>  org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer  -
> Flink Kinesis Consumer is going to read the following streams:
> tr-stream-ingestion,
> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> class org.apache.flink.types.Row does not contain a getter for field fields
> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> class org.apache.flink.types.Row does not contain a setter for field fields
> 3698 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> Class class org.apache.flink.types.Row cannot be used as a POJO type
> because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> class org.apache.flink.types.Row does not contain a getter for field fields
> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> class org.apache.flink.types.Row does not contain a setter for field fields
> 3730 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> Class class org.apache.flink.types.Row cannot be used as a POJO type
> because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> class org.apache.flink.types.Row does not contain a getter for field fields
> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> class org.apache.flink.types.Row does not contain a setter for field fields
> 3753 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  -
> Class class org.apache.flink.types.Row cannot be used as a POJO type
> because not all fields are valid POJO fields, and must be processed as
> GenericType. Please read the Flink documentation on "Data Types &
> Serialization" for details of the effect on performance.
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Table is not an append-only table. Use the toRetractStream() in order to
> handle add and retract messages.
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:920)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:896)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:308)
> at
> org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:262)
> at consumer.trconsumer.main(trconsumer.java:180)
>