You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flink Lover <fl...@gmail.com> on 2022/01/06 17:13:18 UTC

Trigger the producer to send data to the consumer after mentioned seconds

Hello Folks!

I have a DataStream which sends data to the consumer but I got the data
once the code completed its execution. I didn't receive the records as the
code was writing it to the topic. I was able to achieve this behavior using
AT_LEAST_ONCE property but I decided to implement Watermarks. I haven't
enabled checkpointing as of now. I know checkpointing will also do the
trick.  My expectation is Producer should batch the records of 2 seconds
and send it to the consumer and consumer should receive a batch of 2
seconds. My code goes as below:

*Producer Side*:
 dataToKafka.assignTimestampsAndWatermarks(
      WatermarkStrategy
        .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
    dataToKafka.addSink(myProducer).uid("source")

*Consumer Side*:
consumer.assignTimestampsAndWatermarks(
      WatermarkStrategy
        .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))

Now this gives me an error as below:

Static methods in interface require -target:jvm-1.8
        .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))

My scala version is 2.11.12 and Java JDK 1.8.0.281

Thanks,
Martin.

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by David Morávek <dm...@apache.org>.
you’re using compile target lower then 1.8, what needs to be done depends
on your build tool

On Fri 7. 1. 2022 at 20:05, Flink Lover <fl...@gmail.com> wrote:

> Hi David,
>
> Thanks for your explanation!
>
> I am familiar with how JVM works but why am I facing this issue? What
> exactly needs to be done?
>
> Thanks,
> Martin O.
>
> On Sat, Jan 8, 2022 at 12:19 AM David Morávek <dm...@apache.org> wrote:
>
>> Hi Siddhesh,
>>
>> any JVM based language (Java, Scala, Kotlin) compiles into a byte-code
>> that can be executed by the JVM. As the JVM was evolving over the years,
>> new versions of byte code have been introduced. Target version simply
>> refers the the version of bytecode the compiler should generate. How to
>> specify it depends on the used build tool, but in general it all boils down
>> to running java compiler with specified "-target" [1].
>>
>> [1]
>> https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html
>>
>> D.
>>
>> On Fri, Jan 7, 2022 at 6:22 PM Flink Lover <fl...@gmail.com>
>> wrote:
>>
>>> Could you please help me with this?
>>>
>>> On Fri, Jan 7, 2022 at 11:48 AM Flink Lover <fl...@gmail.com>
>>> wrote:
>>>
>>>> I tried Flink version 1.14.2 / 1.13.5
>>>>
>>>> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <fl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Also, I am using flink-connector-kafka_2.11
>>>>>
>>>>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>>>>> SimpleStringSchema(), properties)
>>>>>
>>>>>
>>>>> val myProducer = new FlinkKafkaProducer[String](
>>>>>       "topic_name", // target topic
>>>>>       new KeyedSerializationSchemaWrapper[String](new
>>>>> SimpleStringSchema()), // serialization schema
>>>>>       getProperties(), // producer config
>>>>>       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <fl...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I checked the Java version using the java -version on the terminal
>>>>>> and it gave me 1.8.0.281. Also, the project has been compiled using JDK 8
>>>>>> only which is by default.
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> What do you mean by target jvm? Also, what I am trying to achieve is
>>>>>> correct? about the windows?
>>>>>>
>>>>>> Thanks,
>>>>>> Martin
>>>>>>
>>>>>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Martin,
>>>>>>>
>>>>>>> Can you provide the configuration of your Kafka producer and
>>>>>>> consumer? Also it’ll be helpful to have the complete code of your
>>>>>>> DataStream.
>>>>>>>
>>>>>>> About the error you mentioned, I doubt that the JDK version you
>>>>>>> actually use is probably below 1.8. Can you have a double check on the
>>>>>>> environment that your job is running in?
>>>>>>>
>>>>>>> Cheers,
>>>>>>>
>>>>>>> Qingsheng Ren
>>>>>>>
>>>>>>>
>>>>>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com>
>>>>>>> wrote:
>>>>>>> >
>>>>>>> > Hello Folks!
>>>>>>> >
>>>>>>> > I have a DataStream which sends data to the consumer but I got the
>>>>>>> data once the code completed its execution. I didn't receive the records as
>>>>>>> the code was writing it to the topic. I was able to achieve this behavior
>>>>>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>>>>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>>>>>> the trick.  My expectation is Producer should batch the records of 2
>>>>>>> seconds and send it to the consumer and consumer should receive a batch of
>>>>>>> 2 seconds. My code goes as below:
>>>>>>> >
>>>>>>> > Producer Side:
>>>>>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>>>>>> >       WatermarkStrategy
>>>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>>> >     dataToKafka.addSink(myProducer).uid("source")
>>>>>>> >
>>>>>>> > Consumer Side:
>>>>>>> > consumer.assignTimestampsAndWatermarks(
>>>>>>> >       WatermarkStrategy
>>>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>>> >
>>>>>>> > Now this gives me an error as below:
>>>>>>> >
>>>>>>> > Static methods in interface require -target:jvm-1.8
>>>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>>> >
>>>>>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>>>>>> >
>>>>>>> > Thanks,
>>>>>>> > Martin.
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>>
>>>>>>>

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by Flink Lover <fl...@gmail.com>.
Hi David,

Thanks for your explanation!

I am familiar with how JVM works but why am I facing this issue? What
exactly needs to be done?

Thanks,
Martin O.

On Sat, Jan 8, 2022 at 12:19 AM David Morávek <dm...@apache.org> wrote:

> Hi Siddhesh,
>
> any JVM based language (Java, Scala, Kotlin) compiles into a byte-code
> that can be executed by the JVM. As the JVM was evolving over the years,
> new versions of byte code have been introduced. Target version simply
> refers the the version of bytecode the compiler should generate. How to
> specify it depends on the used build tool, but in general it all boils down
> to running java compiler with specified "-target" [1].
>
> [1]
> https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html
>
> D.
>
> On Fri, Jan 7, 2022 at 6:22 PM Flink Lover <fl...@gmail.com> wrote:
>
>> Could you please help me with this?
>>
>> On Fri, Jan 7, 2022 at 11:48 AM Flink Lover <fl...@gmail.com>
>> wrote:
>>
>>> I tried Flink version 1.14.2 / 1.13.5
>>>
>>> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <fl...@gmail.com>
>>> wrote:
>>>
>>>> Also, I am using flink-connector-kafka_2.11
>>>>
>>>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>>>> SimpleStringSchema(), properties)
>>>>
>>>>
>>>> val myProducer = new FlinkKafkaProducer[String](
>>>>       "topic_name", // target topic
>>>>       new KeyedSerializationSchemaWrapper[String](new
>>>> SimpleStringSchema()), // serialization schema
>>>>       getProperties(), // producer config
>>>>       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>>>
>>>>
>>>>
>>>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <fl...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I checked the Java version using the java -version on the terminal and
>>>>> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>>>>> which is by default.
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> What do you mean by target jvm? Also, what I am trying to achieve is
>>>>> correct? about the windows?
>>>>>
>>>>> Thanks,
>>>>> Martin
>>>>>
>>>>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Martin,
>>>>>>
>>>>>> Can you provide the configuration of your Kafka producer and
>>>>>> consumer? Also it’ll be helpful to have the complete code of your
>>>>>> DataStream.
>>>>>>
>>>>>> About the error you mentioned, I doubt that the JDK version you
>>>>>> actually use is probably below 1.8. Can you have a double check on the
>>>>>> environment that your job is running in?
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Qingsheng Ren
>>>>>>
>>>>>>
>>>>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > Hello Folks!
>>>>>> >
>>>>>> > I have a DataStream which sends data to the consumer but I got the
>>>>>> data once the code completed its execution. I didn't receive the records as
>>>>>> the code was writing it to the topic. I was able to achieve this behavior
>>>>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>>>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>>>>> the trick.  My expectation is Producer should batch the records of 2
>>>>>> seconds and send it to the consumer and consumer should receive a batch of
>>>>>> 2 seconds. My code goes as below:
>>>>>> >
>>>>>> > Producer Side:
>>>>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>>>>> >       WatermarkStrategy
>>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>> >     dataToKafka.addSink(myProducer).uid("source")
>>>>>> >
>>>>>> > Consumer Side:
>>>>>> > consumer.assignTimestampsAndWatermarks(
>>>>>> >       WatermarkStrategy
>>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>> >
>>>>>> > Now this gives me an error as below:
>>>>>> >
>>>>>> > Static methods in interface require -target:jvm-1.8
>>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>>> >
>>>>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>>>>> >
>>>>>> > Thanks,
>>>>>> > Martin.
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>>

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by David Morávek <dm...@apache.org>.
Hi Siddhesh,

any JVM based language (Java, Scala, Kotlin) compiles into a byte-code that
can be executed by the JVM. As the JVM was evolving over the years, new
versions of byte code have been introduced. Target version simply refers
the the version of bytecode the compiler should generate. How to specify it
depends on the used build tool, but in general it all boils down to running
java compiler with specified "-target" [1].

[1] https://docs.oracle.com/javase/8/docs/technotes/tools/windows/javac.html

D.

On Fri, Jan 7, 2022 at 6:22 PM Flink Lover <fl...@gmail.com> wrote:

> Could you please help me with this?
>
> On Fri, Jan 7, 2022 at 11:48 AM Flink Lover <fl...@gmail.com>
> wrote:
>
>> I tried Flink version 1.14.2 / 1.13.5
>>
>> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <fl...@gmail.com>
>> wrote:
>>
>>> Also, I am using flink-connector-kafka_2.11
>>>
>>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>>> SimpleStringSchema(), properties)
>>>
>>>
>>> val myProducer = new FlinkKafkaProducer[String](
>>>       "topic_name", // target topic
>>>       new KeyedSerializationSchemaWrapper[String](new
>>> SimpleStringSchema()), // serialization schema
>>>       getProperties(), // producer config
>>>       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>>
>>>
>>>
>>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <fl...@gmail.com>
>>> wrote:
>>>
>>>> Hi All,
>>>>
>>>> I checked the Java version using the java -version on the terminal and
>>>> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>>>> which is by default.
>>>>
>>>> [image: image.png]
>>>>
>>>> What do you mean by target jvm? Also, what I am trying to achieve is
>>>> correct? about the windows?
>>>>
>>>> Thanks,
>>>> Martin
>>>>
>>>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Martin,
>>>>>
>>>>> Can you provide the configuration of your Kafka producer and consumer?
>>>>> Also it’ll be helpful to have the complete code of your DataStream.
>>>>>
>>>>> About the error you mentioned, I doubt that the JDK version you
>>>>> actually use is probably below 1.8. Can you have a double check on the
>>>>> environment that your job is running in?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Qingsheng Ren
>>>>>
>>>>>
>>>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > Hello Folks!
>>>>> >
>>>>> > I have a DataStream which sends data to the consumer but I got the
>>>>> data once the code completed its execution. I didn't receive the records as
>>>>> the code was writing it to the topic. I was able to achieve this behavior
>>>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>>>> the trick.  My expectation is Producer should batch the records of 2
>>>>> seconds and send it to the consumer and consumer should receive a batch of
>>>>> 2 seconds. My code goes as below:
>>>>> >
>>>>> > Producer Side:
>>>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>>>> >       WatermarkStrategy
>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>> >     dataToKafka.addSink(myProducer).uid("source")
>>>>> >
>>>>> > Consumer Side:
>>>>> > consumer.assignTimestampsAndWatermarks(
>>>>> >       WatermarkStrategy
>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>> >
>>>>> > Now this gives me an error as below:
>>>>> >
>>>>> > Static methods in interface require -target:jvm-1.8
>>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>>> >
>>>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>>>> >
>>>>> > Thanks,
>>>>> > Martin.
>>>>> >
>>>>> >
>>>>> >
>>>>>
>>>>>

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by Flink Lover <fl...@gmail.com>.
Could you please help me with this?

On Fri, Jan 7, 2022 at 11:48 AM Flink Lover <fl...@gmail.com> wrote:

> I tried Flink version 1.14.2 / 1.13.5
>
> On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <fl...@gmail.com>
> wrote:
>
>> Also, I am using flink-connector-kafka_2.11
>>
>> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
>> SimpleStringSchema(), properties)
>>
>>
>> val myProducer = new FlinkKafkaProducer[String](
>>       "topic_name", // target topic
>>       new KeyedSerializationSchemaWrapper[String](new
>> SimpleStringSchema()), // serialization schema
>>       getProperties(), // producer config
>>       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>>
>>
>>
>> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <fl...@gmail.com>
>> wrote:
>>
>>> Hi All,
>>>
>>> I checked the Java version using the java -version on the terminal and
>>> it gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>>> which is by default.
>>>
>>> [image: image.png]
>>>
>>> What do you mean by target jvm? Also, what I am trying to achieve is
>>> correct? about the windows?
>>>
>>> Thanks,
>>> Martin
>>>
>>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> Can you provide the configuration of your Kafka producer and consumer?
>>>> Also it’ll be helpful to have the complete code of your DataStream.
>>>>
>>>> About the error you mentioned, I doubt that the JDK version you
>>>> actually use is probably below 1.8. Can you have a double check on the
>>>> environment that your job is running in?
>>>>
>>>> Cheers,
>>>>
>>>> Qingsheng Ren
>>>>
>>>>
>>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com>
>>>> wrote:
>>>> >
>>>> > Hello Folks!
>>>> >
>>>> > I have a DataStream which sends data to the consumer but I got the
>>>> data once the code completed its execution. I didn't receive the records as
>>>> the code was writing it to the topic. I was able to achieve this behavior
>>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>>> the trick.  My expectation is Producer should batch the records of 2
>>>> seconds and send it to the consumer and consumer should receive a batch of
>>>> 2 seconds. My code goes as below:
>>>> >
>>>> > Producer Side:
>>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>>> >       WatermarkStrategy
>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>> >     dataToKafka.addSink(myProducer).uid("source")
>>>> >
>>>> > Consumer Side:
>>>> > consumer.assignTimestampsAndWatermarks(
>>>> >       WatermarkStrategy
>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>> >
>>>> > Now this gives me an error as below:
>>>> >
>>>> > Static methods in interface require -target:jvm-1.8
>>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>>> >
>>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>>> >
>>>> > Thanks,
>>>> > Martin.
>>>> >
>>>> >
>>>> >
>>>>
>>>>

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by Flink Lover <fl...@gmail.com>.
I tried Flink version 1.14.2 / 1.13.5

On Fri, Jan 7, 2022 at 11:46 AM Flink Lover <fl...@gmail.com> wrote:

> Also, I am using flink-connector-kafka_2.11
>
> val consumer = new FlinkKafkaConsumer[String]("topic_name", new
> SimpleStringSchema(), properties)
>
>
> val myProducer = new FlinkKafkaProducer[String](
>       "topic_name", // target topic
>       new KeyedSerializationSchemaWrapper[String](new
> SimpleStringSchema()), // serialization schema
>       getProperties(), // producer config
>       FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
>
>
>
> On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <fl...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I checked the Java version using the java -version on the terminal and it
>> gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
>> which is by default.
>>
>> [image: image.png]
>>
>> What do you mean by target jvm? Also, what I am trying to achieve is
>> correct? about the windows?
>>
>> Thanks,
>> Martin
>>
>> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com> wrote:
>>
>>> Hi Martin,
>>>
>>> Can you provide the configuration of your Kafka producer and consumer?
>>> Also it’ll be helpful to have the complete code of your DataStream.
>>>
>>> About the error you mentioned, I doubt that the JDK version you actually
>>> use is probably below 1.8. Can you have a double check on the environment
>>> that your job is running in?
>>>
>>> Cheers,
>>>
>>> Qingsheng Ren
>>>
>>>
>>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com>
>>> wrote:
>>> >
>>> > Hello Folks!
>>> >
>>> > I have a DataStream which sends data to the consumer but I got the
>>> data once the code completed its execution. I didn't receive the records as
>>> the code was writing it to the topic. I was able to achieve this behavior
>>> using AT_LEAST_ONCE property but I decided to implement Watermarks. I
>>> haven't enabled checkpointing as of now. I know checkpointing will also do
>>> the trick.  My expectation is Producer should batch the records of 2
>>> seconds and send it to the consumer and consumer should receive a batch of
>>> 2 seconds. My code goes as below:
>>> >
>>> > Producer Side:
>>> >  dataToKafka.assignTimestampsAndWatermarks(
>>> >       WatermarkStrategy
>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >     dataToKafka.addSink(myProducer).uid("source")
>>> >
>>> > Consumer Side:
>>> > consumer.assignTimestampsAndWatermarks(
>>> >       WatermarkStrategy
>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >
>>> > Now this gives me an error as below:
>>> >
>>> > Static methods in interface require -target:jvm-1.8
>>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>>> >
>>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>>> >
>>> > Thanks,
>>> > Martin.
>>> >
>>> >
>>> >
>>>
>>>

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by Flink Lover <fl...@gmail.com>.
Also, I am using flink-connector-kafka_2.11

val consumer = new FlinkKafkaConsumer[String]("topic_name", new
SimpleStringSchema(), properties)


val myProducer = new FlinkKafkaProducer[String](
      "topic_name", // target topic
      new KeyedSerializationSchemaWrapper[String](new
SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)



On Fri, Jan 7, 2022 at 11:43 AM Flink Lover <fl...@gmail.com> wrote:

> Hi All,
>
> I checked the Java version using the java -version on the terminal and it
> gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
> which is by default.
>
> [image: image.png]
>
> What do you mean by target jvm? Also, what I am trying to achieve is
> correct? about the windows?
>
> Thanks,
> Martin
>
> On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com> wrote:
>
>> Hi Martin,
>>
>> Can you provide the configuration of your Kafka producer and consumer?
>> Also it’ll be helpful to have the complete code of your DataStream.
>>
>> About the error you mentioned, I doubt that the JDK version you actually
>> use is probably below 1.8. Can you have a double check on the environment
>> that your job is running in?
>>
>> Cheers,
>>
>> Qingsheng Ren
>>
>>
>> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com> wrote:
>> >
>> > Hello Folks!
>> >
>> > I have a DataStream which sends data to the consumer but I got the data
>> once the code completed its execution. I didn't receive the records as the
>> code was writing it to the topic. I was able to achieve this behavior using
>> AT_LEAST_ONCE property but I decided to implement Watermarks. I haven't
>> enabled checkpointing as of now. I know checkpointing will also do the
>> trick.  My expectation is Producer should batch the records of 2 seconds
>> and send it to the consumer and consumer should receive a batch of 2
>> seconds. My code goes as below:
>> >
>> > Producer Side:
>> >  dataToKafka.assignTimestampsAndWatermarks(
>> >       WatermarkStrategy
>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >     dataToKafka.addSink(myProducer).uid("source")
>> >
>> > Consumer Side:
>> > consumer.assignTimestampsAndWatermarks(
>> >       WatermarkStrategy
>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >
>> > Now this gives me an error as below:
>> >
>> > Static methods in interface require -target:jvm-1.8
>> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
>> >
>> > My scala version is 2.11.12 and Java JDK 1.8.0.281
>> >
>> > Thanks,
>> > Martin.
>> >
>> >
>> >
>>
>>

Re: Trigger the producer to send data to the consumer after mentioned seconds

Posted by Flink Lover <fl...@gmail.com>.
Hi All,

I checked the Java version using the java -version on the terminal and it
gave me 1.8.0.281. Also, the project has been compiled using JDK 8 only
which is by default.

[image: image.png]

What do you mean by target jvm? Also, what I am trying to achieve is
correct? about the windows?

Thanks,
Martin

On Fri, Jan 7, 2022 at 8:07 AM Qingsheng Ren <re...@gmail.com> wrote:

> Hi Martin,
>
> Can you provide the configuration of your Kafka producer and consumer?
> Also it’ll be helpful to have the complete code of your DataStream.
>
> About the error you mentioned, I doubt that the JDK version you actually
> use is probably below 1.8. Can you have a double check on the environment
> that your job is running in?
>
> Cheers,
>
> Qingsheng Ren
>
>
> > On Jan 7, 2022, at 1:13 AM, Flink Lover <fl...@gmail.com> wrote:
> >
> > Hello Folks!
> >
> > I have a DataStream which sends data to the consumer but I got the data
> once the code completed its execution. I didn't receive the records as the
> code was writing it to the topic. I was able to achieve this behavior using
> AT_LEAST_ONCE property but I decided to implement Watermarks. I haven't
> enabled checkpointing as of now. I know checkpointing will also do the
> trick.  My expectation is Producer should batch the records of 2 seconds
> and send it to the consumer and consumer should receive a batch of 2
> seconds. My code goes as below:
> >
> > Producer Side:
> >  dataToKafka.assignTimestampsAndWatermarks(
> >       WatermarkStrategy
> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >     dataToKafka.addSink(myProducer).uid("source")
> >
> > Consumer Side:
> > consumer.assignTimestampsAndWatermarks(
> >       WatermarkStrategy
> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >
> > Now this gives me an error as below:
> >
> > Static methods in interface require -target:jvm-1.8
> >         .forBoundedOutOfOrderness[String](Duration.ofSeconds(2)))
> >
> > My scala version is 2.11.12 and Java JDK 1.8.0.281
> >
> > Thanks,
> > Martin.
> >
> >
> >
>
>