You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/02/15 07:20:39 UTC
Generated SinkConversion code ignores incoming StreamRecord timestamp
Hi,
I have a source that generates events with timestamps. These flow nicely,
until encountering a conversion from Table -> DataStream[Row]:
def toRowRetractStream(implicit ev: TypeInformation[Row]):
DataStream[Row] =
table
.toRetractStream[Row]
.flatMap { (row, collector: Collector[Row]) =>
if (row._1)
collector.collect(row._2)
}
The transformation causes a SinkConversion to be generated with the
following code:
@Override
public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element) throws Exception {
org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();
Object[] fields$12 = new Object[2];
fields$12[0] =
org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
fields$12[1] = (org.apache.flink.types.Row)
converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
scala.Tuple2 result$10 = (scala.Tuple2)
serializer$11.createInstance(fields$12);
output.collect(outElement.replace(result$10));
}
The code receives an element of type StreamRecord, which does have a
timestamp attached to it, but fails to forward it to the new element
(outElement) which is initialized as:
private final
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
Am I missing anything in the Table -> DataStream[Row] conversion that
should make the timestamp follow through? or is this a bug?
--
Best Regards,
Yuval Itzchakov.
Re: Generated SinkConversion code ignores incoming StreamRecord
timestamp
Posted by Dawid Wysakowicz <dw...@apache.org>.
The best I can do is point you to the thread[1].
I am also cc'ing Yuan who is the release manager for 1.12.2.
Best,
Dawid
[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Apache-Flink-1-12-2-td48603.html
On 15/02/2021 08:51, Yuval Itzchakov wrote:
> Hi Dawid,
> Yes, looks like it. Thanks!
>
> Is there an ETA on 1.12.2 yet?
>
> On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
> Hey Yuval,
>
> Could it be that you are hitting this bug[1], which has been fixed
> recently?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-21013
>
> On 15/02/2021 08:20, Yuval Itzchakov wrote:
>> Hi,
>>
>> I have a source that generates events with timestamps. These flow
>> nicely, until encountering a conversion from Table ->
>> DataStream[Row]:
>>
>> def toRowRetractStream(implicit ev: TypeInformation[Row]):
>> DataStream[Row] =
>> table
>> .toRetractStream[Row]
>> .flatMap { (row, collector: Collector[Row]) =>
>> if (row._1)
>> collector.collect(row._2)
>> }
>>
>> The transformation causes a SinkConversion to be generated with
>> the following code:
>>
>> @Override
>> public void
>> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> element) throws Exception {
>> org.apache.flink.table.data.RowData in1 =
>> (org.apache.flink.table.data.RowData) element.getValue();
>>
>> Object[] fields$12 = new Object[2];
>> fields$12[0] =
>> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
>> fields$12[1] = (org.apache.flink.types.Row)
>> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
>> scala.Tuple2 result$10 = (scala.Tuple2)
>> serializer$11.createInstance(fields$12);
>> output.collect(outElement.replace(result$10));
>> }
>>
>> The code receives an element of type StreamRecord, which does
>> have a timestamp attached to it, but fails to forward it to the
>> new element (outElement) which is initialized as:
>>
>> private final
>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>> outElement = new
>> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>>
>> Am I missing anything in the Table -> DataStream[Row] conversion
>> that should make the timestamp follow through? or is this a bug?
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
Re: Generated SinkConversion code ignores incoming StreamRecord timestamp
Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Dawid,
Yes, looks like it. Thanks!
Is there an ETA on 1.12.2 yet?
On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz <dw...@apache.org>
wrote:
> Hey Yuval,
>
> Could it be that you are hitting this bug[1], which has been fixed
> recently?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-21013
> On 15/02/2021 08:20, Yuval Itzchakov wrote:
>
> Hi,
>
> I have a source that generates events with timestamps. These flow nicely,
> until encountering a conversion from Table -> DataStream[Row]:
>
> def toRowRetractStream(implicit ev: TypeInformation[Row]):
> DataStream[Row] =
> table
> .toRetractStream[Row]
> .flatMap { (row, collector: Collector[Row]) =>
> if (row._1)
> collector.collect(row._2)
> }
>
> The transformation causes a SinkConversion to be generated with the
> following code:
>
> @Override
> public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
> org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
>
> Object[] fields$12 = new Object[2];
> fields$12[0] =
> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
> fields$12[1] = (org.apache.flink.types.Row)
> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
> scala.Tuple2 result$10 = (scala.Tuple2)
> serializer$11.createInstance(fields$12);
> output.collect(outElement.replace(result$10));
> }
>
> The code receives an element of type StreamRecord, which does have a
> timestamp attached to it, but fails to forward it to the new element
> (outElement) which is initialized as:
>
> private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>
> Am I missing anything in the Table -> DataStream[Row] conversion that
> should make the timestamp follow through? or is this a bug?
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>
--
Best Regards,
Yuval Itzchakov.
Re: Generated SinkConversion code ignores incoming StreamRecord
timestamp
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Yuval,
Could it be that you are hitting this bug[1], which has been fixed recently?
Best,
Dawid
[1] https://issues.apache.org/jira/browse/FLINK-21013
On 15/02/2021 08:20, Yuval Itzchakov wrote:
> Hi,
>
> I have a source that generates events with timestamps. These flow
> nicely, until encountering a conversion from Table -> DataStream[Row]:
>
> def toRowRetractStream(implicit ev: TypeInformation[Row]):
> DataStream[Row] =
> table
> .toRetractStream[Row]
> .flatMap { (row, collector: Collector[Row]) =>
> if (row._1)
> collector.collect(row._2)
> }
>
> The transformation causes a SinkConversion to be generated with the
> following code:
>
> @Override
> public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
> org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
>
> Object[] fields$12 = new Object[2];
> fields$12[0] =
> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
> fields$12[1] = (org.apache.flink.types.Row)
> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
> scala.Tuple2 result$10 = (scala.Tuple2)
> serializer$11.createInstance(fields$12);
> output.collect(outElement.replace(result$10));
> }
>
> The code receives an element of type StreamRecord, which does have a
> timestamp attached to it, but fails to forward it to the new element
> (outElement) which is initialized as:
>
> private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> outElement = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>
> Am I missing anything in the Table -> DataStream[Row] conversion that
> should make the timestamp follow through? or is this a bug?
>
> --
> Best Regards,
> Yuval Itzchakov.