You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yuval Itzchakov <yu...@gmail.com> on 2021/02/15 07:20:39 UTC

Generated SinkConversion code ignores incoming StreamRecord timestamp

Hi,

I have a source that generates events with timestamps. These flow nicely,
until encountering a conversion from Table -> DataStream[Row]:

    def toRowRetractStream(implicit ev: TypeInformation[Row]):
DataStream[Row] =
      table
        .toRetractStream[Row]
        .flatMap { (row, collector: Collector[Row]) =>
          if (row._1)
            collector.collect(row._2)
        }

The transformation causes a SinkConversion to be generated with the
following code:

        @Override
        public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element) throws Exception {
          org.apache.flink.table.data.RowData in1 =
(org.apache.flink.table.data.RowData) element.getValue();

          Object[] fields$12 = new Object[2];
          fields$12[0] =
org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
          fields$12[1] = (org.apache.flink.types.Row)
converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
          scala.Tuple2 result$10 = (scala.Tuple2)
serializer$11.createInstance(fields$12);
          output.collect(outElement.replace(result$10));
        }

The code receives an element of type StreamRecord, which does have a
timestamp attached to it, but fails to forward it to the new element
(outElement) which is initialized as:

        private final
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

Am I missing anything in the Table -> DataStream[Row] conversion that
should make the timestamp follow through? or is this a bug?

-- 
Best Regards,
Yuval Itzchakov.

Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

Posted by Dawid Wysakowicz <dw...@apache.org>.
The best I can do is point you to the thread[1].

I am also cc'ing Yuan who is the release manager for 1.12.2.

Best,

Dawid

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Apache-Flink-1-12-2-td48603.html

On 15/02/2021 08:51, Yuval Itzchakov wrote:
> Hi Dawid,
> Yes, looks like it. Thanks!
>
> Is there an ETA on 1.12.2 yet?
>
> On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz
> <dwysakowicz@apache.org <ma...@apache.org>> wrote:
>
>     Hey Yuval,
>
>     Could it be that you are hitting this bug[1], which has been fixed
>     recently?
>
>     Best,
>
>     Dawid
>
>     [1] https://issues.apache.org/jira/browse/FLINK-21013
>
>     On 15/02/2021 08:20, Yuval Itzchakov wrote:
>>     Hi,
>>
>>     I have a source that generates events with timestamps. These flow
>>     nicely, until encountering a conversion from Table ->
>>     DataStream[Row]:
>>
>>         def toRowRetractStream(implicit ev: TypeInformation[Row]):
>>     DataStream[Row] =
>>           table
>>             .toRetractStream[Row]
>>             .flatMap { (row, collector: Collector[Row]) =>
>>               if (row._1)
>>                 collector.collect(row._2)
>>             }
>>
>>     The transformation causes a SinkConversion to be generated with
>>     the following code:
>>
>>             @Override
>>             public void
>>     processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>     element) throws Exception {
>>               org.apache.flink.table.data.RowData in1 =
>>     (org.apache.flink.table.data.RowData) element.getValue();
>>                
>>               Object[] fields$12 = new Object[2];
>>               fields$12[0] =
>>     org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
>>               fields$12[1] = (org.apache.flink.types.Row)
>>     converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
>>               scala.Tuple2 result$10 = (scala.Tuple2)
>>     serializer$11.createInstance(fields$12);
>>               output.collect(outElement.replace(result$10));         
>>             }
>>
>>     The code receives an element of type StreamRecord, which does
>>     have a timestamp attached to it, but fails to forward it to the
>>     new element (outElement) which is initialized as:
>>
>>             private final
>>     org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>>     outElement = new
>>     org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>>
>>     Am I missing anything in the Table -> DataStream[Row] conversion
>>     that should make the timestamp follow through? or is this a bug?
>>
>>     -- 
>>     Best Regards,
>>     Yuval Itzchakov.
>
>
>
> -- 
> Best Regards,
> Yuval Itzchakov.

Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

Posted by Yuval Itzchakov <yu...@gmail.com>.
Hi Dawid,
Yes, looks like it. Thanks!

Is there an ETA on 1.12.2 yet?

On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hey Yuval,
>
> Could it be that you are hitting this bug[1], which has been fixed
> recently?
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-21013
> On 15/02/2021 08:20, Yuval Itzchakov wrote:
>
> Hi,
>
> I have a source that generates events with timestamps. These flow nicely,
> until encountering a conversion from Table -> DataStream[Row]:
>
>     def toRowRetractStream(implicit ev: TypeInformation[Row]):
> DataStream[Row] =
>       table
>         .toRetractStream[Row]
>         .flatMap { (row, collector: Collector[Row]) =>
>           if (row._1)
>             collector.collect(row._2)
>         }
>
> The transformation causes a SinkConversion to be generated with the
> following code:
>
>         @Override
>         public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
>           org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
>
>           Object[] fields$12 = new Object[2];
>           fields$12[0] =
> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
>           fields$12[1] = (org.apache.flink.types.Row)
> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
>           scala.Tuple2 result$10 = (scala.Tuple2)
> serializer$11.createInstance(fields$12);
>           output.collect(outElement.replace(result$10));
>         }
>
> The code receives an element of type StreamRecord, which does have a
> timestamp attached to it, but fails to forward it to the new element
> (outElement) which is initialized as:
>
>         private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>
> Am I missing anything in the Table -> DataStream[Row] conversion that
> should make the timestamp follow through? or is this a bug?
>
> --
> Best Regards,
> Yuval Itzchakov.
>
>

-- 
Best Regards,
Yuval Itzchakov.

Re: Generated SinkConversion code ignores incoming StreamRecord timestamp

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hey Yuval,

Could it be that you are hitting this bug[1], which has been fixed recently?

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-21013

On 15/02/2021 08:20, Yuval Itzchakov wrote:
> Hi,
>
> I have a source that generates events with timestamps. These flow
> nicely, until encountering a conversion from Table -> DataStream[Row]:
>
>     def toRowRetractStream(implicit ev: TypeInformation[Row]):
> DataStream[Row] =
>       table
>         .toRetractStream[Row]
>         .flatMap { (row, collector: Collector[Row]) =>
>           if (row._1)
>             collector.collect(row._2)
>         }
>
> The transformation causes a SinkConversion to be generated with the
> following code:
>
>         @Override
>         public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
>           org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
>            
>           Object[] fields$12 = new Object[2];
>           fields$12[0] =
> org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1);
>           fields$12[1] = (org.apache.flink.types.Row)
> converter$9.toExternal((org.apache.flink.table.data.RowData) in1);
>           scala.Tuple2 result$10 = (scala.Tuple2)
> serializer$11.createInstance(fields$12);
>           output.collect(outElement.replace(result$10));         
>         }
>
> The code receives an element of type StreamRecord, which does have a
> timestamp attached to it, but fails to forward it to the new element
> (outElement) which is initialized as:
>
>         private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> outElement = new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>
> Am I missing anything in the Table -> DataStream[Row] conversion that
> should make the timestamp follow through? or is this a bug?
>
> -- 
> Best Regards,
> Yuval Itzchakov.