You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Till Rohrmann <tr...@apache.org> on 2020/11/06 17:56:31 UTC

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when
chaining methods [1].

The solution would be

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> {
return event.getTime();
                        });

@Aljoscha Krettek <al...@apache.org> I think we need to update the
documentation about it. We have some examples which don't take this into
account.

[1]
https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/

Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <ca...@hotmail.com>
wrote:

> Hi,
>
> I'm taking the timestamp from the event payload that I'm receiving from
> Kafka.
>
> I'm struggling to get the time and I'm confused on how I should use the
> function ".withTimestampAssigner()". I'm receiving an error on event.
> getTime() that is telling me: *"cannot resolve method "Get Time" in
> "Object"* and I really don't understand how I can fix it.  My class is
> providing a long so the variable itself should be fine. Any help would be
> really appreciated.
>
> *This is my code:*
>
> *    FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .forMonotonousTimestamps()*
> *                        .withTimestampAssigner((event, timestamp) -> {
> return event.**getTime();*
> *                        });*
>
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> And to give you the idea of the whole project,
>
> *This is the EventDeserializationSchema class:*
>
> *public class EventDeserializationSchema implements
> DeserializationSchema<Event> {*
>
> *    private static final long serialVersionUID = 1L;*
>
>
> *    private static final CsvSchema schema = CsvSchema.builder()*
> *            .addColumn("firstName")*
> *            .addColumn("lastName")*
> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
> *            .addColumn("time")*
> *            .build();*
>
> *    private static final ObjectMapper mapper = new CsvMapper();*
>
> *    @Override*
> *    public Event deserialize(byte[] message) throws IOException {*
> *        return
> mapper.readerFor(Event.class).with(schema).readValue(message);*
> *    }*
>
> *    @Override*
> *    public boolean isEndOfStream(Event nextElement) {*
> *        return false;*
> *    }*
>
> *    @Override*
> *    public TypeInformation<Event> getProducedType() {*
>
> *        return TypeInformation.of(Event.class);*
> *    }*
> *}*
>
> *And this is the Event Class:*
>
> *public class Event implements Serializable {*
> *    public String firstName;*
> *    public String lastName;*
> *    private int age;*
> *    public Long time;*
>
>
>
> *    public Event() {*
> *    }*
>
> *    public String getFirstName() {*
> *        return firstName;*
> *    }*
>
> *    public void setFirstName(String firstName) {*
> *        this.firstName = firstName;*
> *    }*
>
> *    public String getLastName() {*
> *        return lastName;*
> *    }*
>
> *    public void setLastName(String lastName) {*
> *        this.lastName = lastName;*
> *    }*
>
> *    public int getAge() {*
> *        return age;*
> *    }*
>
> *    public void setAge(int age) {*
> *        this.age = age;*
> *    }*
>
> *    public long getTime() {*
> *        return time;*
> *    }*
>
> *    public void setTime(String kafkaTime) {*
> *        long tn =
> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
> *        this.time = tn;*
> *    }*
> *}*
>
>
>
>
>
>

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Posted by Till Rohrmann <tr...@apache.org>.
I think in the JavaDocs of the WatermarkStrategy we give an incorrect
example. I have created an issue [1] to fix the problem.

[1] https://issues.apache.org/jira/browse/FLINK-20156

Cheers,
Till

On Mon, Nov 9, 2020 at 12:06 PM Aljoscha Krettek <al...@apache.org>
wrote:

> @Till For instances where we use withTimestampAssigner() the examples in
> the docs always use the explicit generic parameter. (See
> event_timestamps_watermarks.md and streaming_analytics.md). For cases
> where we don't use withTimestampAssigner() we don't need the extra
> generic parameter because the compiler can figure it out.
>
> But yes, the Java compiler is not very helpful here... 😅
>
>
> Best,
> Aljoscha
>
> On 09.11.20 09:35, Till Rohrmann wrote:
> > Glad to hear it!
> >
> > Cheers,
> > Till
> >
> > On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <cavallarin@hotmail.com
> >
> > wrote:
> >
> >> Hi Till,
> >>
> >> That's great! thank you so much!!! I have spent one week on this. I'm so
> >> relieved!
> >>
> >> Cheers
> >>
> >> s
> >>
> >>
> >> ------------------------------
> >> *From:* Till Rohrmann <tr...@apache.org>
> >> *Sent:* 06 November 2020 17:56
> >> *To:* Simone Cavallarin <ca...@hotmail.com>
> >> *Cc:* user@flink.apache.org <us...@flink.apache.org>; Aljoscha Krettek <
> >> aljoscha@apache.org>
> >> *Subject:* Re: How to use properly the function:
> >> withTimestampAssigner((event, timestamp) ->..
> >>
> >> Hi Simone,
> >>
> >> The problem is that the Java 1.8 compiler cannot do type inference when
> >> chaining methods [1].
> >>
> >> The solution would be
> >>
> >> WatermarkStrategy<Event> wmStrategy =
> >>                  WatermarkStrategy
> >>                          .<Event>forMonotonousTimestamps()
> >>                          .withTimestampAssigner((event, timestamp) -> {
> >> return event.getTime();
> >>                          });
> >>
> >> @Aljoscha Krettek <al...@apache.org> I think we need to update the
> >> documentation about it. We have some examples which don't take this into
> >> account.
> >>
> >> [1]
> >>
> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
> >>
> >> Cheers,
> >> Till
> >>
> >> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <
> cavallarin@hotmail.com>
> >> wrote:
> >>
> >> Hi,
> >>
> >> I'm taking the timestamp from the event payload that I'm receiving from
> >> Kafka.
> >>
> >> I'm struggling to get the time and I'm confused on how I should use the
> >> function ".withTimestampAssigner()". I'm receiving an error on event.
> >> getTime() that is telling me: *"cannot resolve method "Get Time" in
> >> "Object"* and I really don't understand how I can fix it.  My class is
> >> providing a long so the variable itself should be fine. Any help would
> be
> >> really appreciated.
> >>
> >> *This is my code:*
> >>
> >> *    FlinkKafkaConsumer<Event> kafkaData =*
> >> *                new FlinkKafkaConsumer("CorID_0", new
> >> EventDeserializationSchema(), p);*
> >> *        WatermarkStrategy<Event> wmStrategy =*
> >> *                WatermarkStrategy*
> >> *                        .forMonotonousTimestamps()*
> >> *                        .withTimestampAssigner((event, timestamp) -> {
> >> return event.**getTime();*
> >> *                        });*
> >>
> >> *        DataStream<Event> stream = env.addSource(*
> >> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
> >>
> >>
> >> And to give you the idea of the whole project,
> >>
> >> *This is the EventDeserializationSchema class:*
> >>
> >> *public class EventDeserializationSchema implements
> >> DeserializationSchema<Event> {*
> >>
> >> *    private static final long serialVersionUID = 1L;*
> >>
> >>
> >> *    private static final CsvSchema schema = CsvSchema.builder()*
> >> *            .addColumn("firstName")*
> >> *            .addColumn("lastName")*
> >> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
> >> *            .addColumn("time")*
> >> *            .build();*
> >>
> >> *    private static final ObjectMapper mapper = new CsvMapper();*
> >>
> >> *    @Override*
> >> *    public Event deserialize(byte[] message) throws IOException {*
> >> *        return
> >> mapper.readerFor(Event.class).with(schema).readValue(message);*
> >> *    }*
> >>
> >> *    @Override*
> >> *    public boolean isEndOfStream(Event nextElement) {*
> >> *        return false;*
> >> *    }*
> >>
> >> *    @Override*
> >> *    public TypeInformation<Event> getProducedType() {*
> >>
> >> *        return TypeInformation.of(Event.class);*
> >> *    }*
> >> *}*
> >>
> >> *And this is the Event Class:*
> >>
> >> *public class Event implements Serializable {*
> >> *    public String firstName;*
> >> *    public String lastName;*
> >> *    private int age;*
> >> *    public Long time;*
> >>
> >>
> >>
> >> *    public Event() {*
> >> *    }*
> >>
> >> *    public String getFirstName() {*
> >> *        return firstName;*
> >> *    }*
> >>
> >> *    public void setFirstName(String firstName) {*
> >> *        this.firstName = firstName;*
> >> *    }*
> >>
> >> *    public String getLastName() {*
> >> *        return lastName;*
> >> *    }*
> >>
> >> *    public void setLastName(String lastName) {*
> >> *        this.lastName = lastName;*
> >> *    }*
> >>
> >> *    public int getAge() {*
> >> *        return age;*
> >> *    }*
> >>
> >> *    public void setAge(int age) {*
> >> *        this.age = age;*
> >> *    }*
> >>
> >> *    public long getTime() {*
> >> *        return time;*
> >> *    }*
> >>
> >> *    public void setTime(String kafkaTime) {*
> >> *        long tn =
> >> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
> >> *        this.time = tn;*
> >> *    }*
> >> *}*
> >>
> >>
> >>
> >>
> >>
> >>
> >
>
>

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Posted by Aljoscha Krettek <al...@apache.org>.
@Till For instances where we use withTimestampAssigner() the examples in 
the docs always use the explicit generic parameter. (See 
event_timestamps_watermarks.md and streaming_analytics.md). For cases 
where we don't use withTimestampAssigner() we don't need the extra 
generic parameter because the compiler can figure it out.

But yes, the Java compiler is not very helpful here... 😅


Best,
Aljoscha

On 09.11.20 09:35, Till Rohrmann wrote:
> Glad to hear it!
> 
> Cheers,
> Till
> 
> On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <ca...@hotmail.com>
> wrote:
> 
>> Hi Till,
>>
>> That's great! thank you so much!!! I have spent one week on this. I'm so
>> relieved!
>>
>> Cheers
>>
>> s
>>
>>
>> ------------------------------
>> *From:* Till Rohrmann <tr...@apache.org>
>> *Sent:* 06 November 2020 17:56
>> *To:* Simone Cavallarin <ca...@hotmail.com>
>> *Cc:* user@flink.apache.org <us...@flink.apache.org>; Aljoscha Krettek <
>> aljoscha@apache.org>
>> *Subject:* Re: How to use properly the function:
>> withTimestampAssigner((event, timestamp) ->..
>>
>> Hi Simone,
>>
>> The problem is that the Java 1.8 compiler cannot do type inference when
>> chaining methods [1].
>>
>> The solution would be
>>
>> WatermarkStrategy<Event> wmStrategy =
>>                  WatermarkStrategy
>>                          .<Event>forMonotonousTimestamps()
>>                          .withTimestampAssigner((event, timestamp) -> {
>> return event.getTime();
>>                          });
>>
>> @Aljoscha Krettek <al...@apache.org> I think we need to update the
>> documentation about it. We have some examples which don't take this into
>> account.
>>
>> [1]
>> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
>>
>> Cheers,
>> Till
>>
>> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <ca...@hotmail.com>
>> wrote:
>>
>> Hi,
>>
>> I'm taking the timestamp from the event payload that I'm receiving from
>> Kafka.
>>
>> I'm struggling to get the time and I'm confused on how I should use the
>> function ".withTimestampAssigner()". I'm receiving an error on event.
>> getTime() that is telling me: *"cannot resolve method "Get Time" in
>> "Object"* and I really don't understand how I can fix it.  My class is
>> providing a long so the variable itself should be fine. Any help would be
>> really appreciated.
>>
>> *This is my code:*
>>
>> *    FlinkKafkaConsumer<Event> kafkaData =*
>> *                new FlinkKafkaConsumer("CorID_0", new
>> EventDeserializationSchema(), p);*
>> *        WatermarkStrategy<Event> wmStrategy =*
>> *                WatermarkStrategy*
>> *                        .forMonotonousTimestamps()*
>> *                        .withTimestampAssigner((event, timestamp) -> {
>> return event.**getTime();*
>> *                        });*
>>
>> *        DataStream<Event> stream = env.addSource(*
>> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>>
>>
>> And to give you the idea of the whole project,
>>
>> *This is the EventDeserializationSchema class:*
>>
>> *public class EventDeserializationSchema implements
>> DeserializationSchema<Event> {*
>>
>> *    private static final long serialVersionUID = 1L;*
>>
>>
>> *    private static final CsvSchema schema = CsvSchema.builder()*
>> *            .addColumn("firstName")*
>> *            .addColumn("lastName")*
>> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
>> *            .addColumn("time")*
>> *            .build();*
>>
>> *    private static final ObjectMapper mapper = new CsvMapper();*
>>
>> *    @Override*
>> *    public Event deserialize(byte[] message) throws IOException {*
>> *        return
>> mapper.readerFor(Event.class).with(schema).readValue(message);*
>> *    }*
>>
>> *    @Override*
>> *    public boolean isEndOfStream(Event nextElement) {*
>> *        return false;*
>> *    }*
>>
>> *    @Override*
>> *    public TypeInformation<Event> getProducedType() {*
>>
>> *        return TypeInformation.of(Event.class);*
>> *    }*
>> *}*
>>
>> *And this is the Event Class:*
>>
>> *public class Event implements Serializable {*
>> *    public String firstName;*
>> *    public String lastName;*
>> *    private int age;*
>> *    public Long time;*
>>
>>
>>
>> *    public Event() {*
>> *    }*
>>
>> *    public String getFirstName() {*
>> *        return firstName;*
>> *    }*
>>
>> *    public void setFirstName(String firstName) {*
>> *        this.firstName = firstName;*
>> *    }*
>>
>> *    public String getLastName() {*
>> *        return lastName;*
>> *    }*
>>
>> *    public void setLastName(String lastName) {*
>> *        this.lastName = lastName;*
>> *    }*
>>
>> *    public int getAge() {*
>> *        return age;*
>> *    }*
>>
>> *    public void setAge(int age) {*
>> *        this.age = age;*
>> *    }*
>>
>> *    public long getTime() {*
>> *        return time;*
>> *    }*
>>
>> *    public void setTime(String kafkaTime) {*
>> *        long tn =
>> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
>> *        this.time = tn;*
>> *    }*
>> *}*
>>
>>
>>
>>
>>
>>
> 


Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Posted by Till Rohrmann <tr...@apache.org>.
Glad to hear it!

Cheers,
Till

On Sun, Nov 8, 2020 at 8:02 PM Simone Cavallarin <ca...@hotmail.com>
wrote:

> Hi Till,
>
> That's great! thank you so much!!! I have spent one week on this. I'm so
> relieved!
>
> Cheers
>
> s
>
>
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *Sent:* 06 November 2020 17:56
> *To:* Simone Cavallarin <ca...@hotmail.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>; Aljoscha Krettek <
> aljoscha@apache.org>
> *Subject:* Re: How to use properly the function:
> withTimestampAssigner((event, timestamp) ->..
>
> Hi Simone,
>
> The problem is that the Java 1.8 compiler cannot do type inference when
> chaining methods [1].
>
> The solution would be
>
> WatermarkStrategy<Event> wmStrategy =
>                 WatermarkStrategy
>                         .<Event>forMonotonousTimestamps()
>                         .withTimestampAssigner((event, timestamp) -> {
> return event.getTime();
>                         });
>
> @Aljoscha Krettek <al...@apache.org> I think we need to update the
> documentation about it. We have some examples which don't take this into
> account.
>
> [1]
> https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/
>
> Cheers,
> Till
>
> On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <ca...@hotmail.com>
> wrote:
>
> Hi,
>
> I'm taking the timestamp from the event payload that I'm receiving from
> Kafka.
>
> I'm struggling to get the time and I'm confused on how I should use the
> function ".withTimestampAssigner()". I'm receiving an error on event.
> getTime() that is telling me: *"cannot resolve method "Get Time" in
> "Object"* and I really don't understand how I can fix it.  My class is
> providing a long so the variable itself should be fine. Any help would be
> really appreciated.
>
> *This is my code:*
>
> *    FlinkKafkaConsumer<Event> kafkaData =*
> *                new FlinkKafkaConsumer("CorID_0", new
> EventDeserializationSchema(), p);*
> *        WatermarkStrategy<Event> wmStrategy =*
> *                WatermarkStrategy*
> *                        .forMonotonousTimestamps()*
> *                        .withTimestampAssigner((event, timestamp) -> {
> return event.**getTime();*
> *                        });*
>
> *        DataStream<Event> stream = env.addSource(*
> *                kafkaData.assignTimestampsAndWatermarks(wmStrategy));*
>
>
> And to give you the idea of the whole project,
>
> *This is the EventDeserializationSchema class:*
>
> *public class EventDeserializationSchema implements
> DeserializationSchema<Event> {*
>
> *    private static final long serialVersionUID = 1L;*
>
>
> *    private static final CsvSchema schema = CsvSchema.builder()*
> *            .addColumn("firstName")*
> *            .addColumn("lastName")*
> *            .addColumn("age", CsvSchema.ColumnType.NUMBER)*
> *            .addColumn("time")*
> *            .build();*
>
> *    private static final ObjectMapper mapper = new CsvMapper();*
>
> *    @Override*
> *    public Event deserialize(byte[] message) throws IOException {*
> *        return
> mapper.readerFor(Event.class).with(schema).readValue(message);*
> *    }*
>
> *    @Override*
> *    public boolean isEndOfStream(Event nextElement) {*
> *        return false;*
> *    }*
>
> *    @Override*
> *    public TypeInformation<Event> getProducedType() {*
>
> *        return TypeInformation.of(Event.class);*
> *    }*
> *}*
>
> *And this is the Event Class:*
>
> *public class Event implements Serializable {*
> *    public String firstName;*
> *    public String lastName;*
> *    private int age;*
> *    public Long time;*
>
>
>
> *    public Event() {*
> *    }*
>
> *    public String getFirstName() {*
> *        return firstName;*
> *    }*
>
> *    public void setFirstName(String firstName) {*
> *        this.firstName = firstName;*
> *    }*
>
> *    public String getLastName() {*
> *        return lastName;*
> *    }*
>
> *    public void setLastName(String lastName) {*
> *        this.lastName = lastName;*
> *    }*
>
> *    public int getAge() {*
> *        return age;*
> *    }*
>
> *    public void setAge(int age) {*
> *        this.age = age;*
> *    }*
>
> *    public long getTime() {*
> *        return time;*
> *    }*
>
> *    public void setTime(String kafkaTime) {*
> *        long tn =
> OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();*
> *        this.time = tn;*
> *    }*
> *}*
>
>
>
>
>
>

Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Posted by Simone Cavallarin <ca...@hotmail.com>.
Hi Till,

That's great! thank you so much!!! I have spent one week on this. I'm so relieved!

Cheers

s


________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: 06 November 2020 17:56
To: Simone Cavallarin <ca...@hotmail.com>
Cc: user@flink.apache.org <us...@flink.apache.org>; Aljoscha Krettek <al...@apache.org>
Subject: Re: How to use properly the function: withTimestampAssigner((event, timestamp) ->..

Hi Simone,

The problem is that the Java 1.8 compiler cannot do type inference when chaining methods [1].

The solution would be

WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

@Aljoscha Krettek<ma...@apache.org> I think we need to update the documentation about it. We have some examples which don't take this into account.

[1] https://e.printstacktrace.blog/java-type-inference-generic-methods-chain-call/

Cheers,
Till

On Fri, Nov 6, 2020 at 4:19 PM Simone Cavallarin <ca...@hotmail.com>> wrote:
Hi,

I'm taking the timestamp from the event payload that I'm receiving from Kafka.

I'm struggling to get the time and I'm confused on how I should use the function ".withTimestampAssigner()". I'm receiving an error on event.getTime() that is telling me: "cannot resolve method "Get Time" in "Object" and I really don't understand how I can fix it.  My class is providing a long so the variable itself should be fine. Any help would be really appreciated.

This is my code:

    FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_0", new EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .forMonotonousTimestamps()
                        .withTimestampAssigner((event, timestamp) -> { return event.getTime();
                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


And to give you the idea of the whole project,

This is the EventDeserializationSchema class:

public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;


    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age", CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {

        return TypeInformation.of(Event.class);
    }
}

And this is the Event Class:

public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public Long time;



    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public long getTime() {
        return time;
    }

    public void setTime(String kafkaTime) {
        long tn = OffsetDateTime.parse(kafkaTime).toInstant().toEpochMilli();
        this.time = tn;
    }
}