You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Niels Basjes <Ni...@basjes.nl> on 2020/07/08 06:54:05 UTC

Chaining the creation of a WatermarkStrategy doesn't work?

Hi,

I'm migrating some of my code to Flink 1.11 and I ran into something I find
strange.

This works

WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));

watermarkStrategy
.withTimestampAssigner((SerializableTimestampAssigner<String>)
(element, recordTimestamp) -> 42L);

However this does NOT work

WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner<String>)
(element, recordTimestamp) -> 42L);


When I try to compile this last one I get

Error:(109, 13) java: no suitable method found for
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>)
    method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
is not applicable
      (argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>
cannot be converted to
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
    method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)
is not applicable
      (argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>
cannot be converted to
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)

Why is that?

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Posted by Niels Basjes <Ni...@basjes.nl>.
Thanks guys,

It is clear this is a Java thing.

Niels

On Wed, Jul 8, 2020 at 9:56 AM Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Ah, didn't realize Chesnay has it answered already, sorry for the
> concurrent
> reply :)
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Ah, didn't realize Chesnay has it answered already, sorry for the concurrent
reply :)



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

This would be more of a Java question.
In short, type inference of generic types does not work for chained
invocations, and therefore type information has to be explicitly included.

If you'd like to chain the calls, this would work:

WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
    .<String>forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
    .withTimestampAssigner((SerializableTimestampAssigner<String>) (element,
recordTimestamp) -> 42L);

Cheers,
Gordon



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Chaining the creation of a WatermarkStrategy doesn't work?

Posted by Chesnay Schepler <ch...@apache.org>.
WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(1, 
ChronoUnit.MINUTES)) returns a WatermarkStrategy<T>, but the exact type 
is entirely dependent on the variable declaration (i.e., it is not 
dependent on any argument).

So, when you assign the strategy to a variable then the compiler can 
infer the generic type. Without a variable it is treated as a 
WatermarkStrategy<Object>, because there is nothing to infer the type from.

On 08/07/2020 08:54, Niels Basjes wrote:
> Hi,
>
> I'm migrating some of my code to Flink 1.11 and I ran into something I 
> find strange.
>
> This works
>
>     WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
>          .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));
>
>     watermarkStrategy      .withTimestampAssigner((SerializableTimestampAssigner<String>) (element, recordTimestamp) -> 42L);
>
> However this does NOT work
>
>     WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
>          .forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))      .withTimestampAssigner((SerializableTimestampAssigner<String>) (element, recordTimestamp) -> 42L);
>
>
> When I try to compile this last one I get
>
> Error:(109, 13) java: no suitable method found for 
> withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>)
>     method 
> org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>) 
> is not applicable
>       (argument mismatch; 
> org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String> 
> cannot be converted to 
> org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
>     method 
> org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>) 
> is not applicable
>       (argument mismatch; 
> org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String> 
> cannot be converted to 
> org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)
>
> Why is that?
>
> -- 
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes