You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Archit Mittal <ma...@gmail.com> on 2017/04/02 10:38:00 UTC
Doubt Regarding producing to kafka using flink
Hi
I am using flink-connector-kafka-0.10_2.10
while producing i am getting error as
java.lang.IllegalArgumentException:* Invalid timestamp -9223372036854775808*
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.
invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_
2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.
processElement(FlinkKafkaProducer010.java:345)
how do i put timestamp in my object before producing ?
Thanks
Archit
Re: Doubt Regarding producing to kafka using flink
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Archit,
The problem is that you need to assign the returned `DataStream` from `stream.assignTimestampsAndWatermarks` to a separate variable, and use that when instantiating the Kafka 0.10 sink.
The `assignTimestampsAndWatermarks` method returns a new `DataStream` instance with records that have assigned timestamps. Calling it does not affect the original `DataStream` instance.
Cheers,
Gordon
On April 3, 2017 at 5:15:03 PM, Archit Mittal (marchit51@gmail.com) wrote:
Hi Gordon
This is the function snippet i am using but i am getting invalid timestamp
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "word");
properties.setProperty("auto.offset.reset", "earliest");
DataStream < WordCount > stream = env.fromElements(wordCount);
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<WordCount>() {
@Override
public long extractAscendingTimestamp(WordCount element) {
return DateTime.now().getMillis();
}
});
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream, KAFKA_TOPIC, new WordCountSchema(), properties);
config.setWriteTimestampToKafka(true);
env.execute("job");
On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Archit!
You’ll need to assign timestamps to the records in your stream before producing them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further questions if you bump into any!
Cheers,
Gordon
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html
On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchit51@gmail.com) wrote:
Hi
I am using flink-connector-kafka-0.10_2.10
while producing i am getting error as
java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345)
how do i put timestamp in my object before producing ?
Thanks
Archit
Re: Doubt Regarding producing to kafka using flink
Posted by Archit Mittal <ma...@gmail.com>.
Hi Gordon
This is the function snippet i am using but i am getting invalid
timestamp
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "word");
properties.setProperty("auto.offset.reset", "earliest");
DataStream < WordCount > stream = env.fromElements(wordCount);
stream.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<WordCount>() {
@Override
public long extractAscendingTimestamp(WordCount element) {
return DateTime.now().getMillis();
}
});
FlinkKafkaProducer010.FlinkKafkaProducer010Configuration
config = FlinkKafkaProducer010.writeToKafkaWithTimestamps(stream,
KAFKA_TOPIC, new WordCountSchema(), properties);
config.setWriteTimestampToKafka(true);
env.execute(*"*job*"*);
On Mon, Apr 3, 2017 at 8:20 AM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:
> Hi Archit!
>
> You’ll need to assign timestamps to the records in your stream before
> producing them to Kafka (i.e. before the FlinkKafkaProducer operator).
> Have a look at [1] and [2] on how to do that. Feel free to ask further
> questions if you bump into any!
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_
> timestamps_watermarks.html
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html>
> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_
> timestamp_extractors.html
>
> On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchit51@gmail.com) wrote:
>
> Hi
>
> I am using flink-connector-kafka-0.10_2.10
>
> while producing i am getting error as
>
> java.lang.IllegalArgumentException:* Invalid timestamp
> -9223372036854775808*
> at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
> ~[kafka-clients-0.10.0.1.jar:na]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
> er010.invokeInternal(FlinkKafkaProducer010.java:249)
> ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProduc
> er010.processElement(FlinkKafkaProducer010.java:345)
>
> how do i put timestamp in my object before producing ?
>
> Thanks
> Archit
>
>
Re: Doubt Regarding producing to kafka using flink
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Archit!
You’ll need to assign timestamps to the records in your stream before producing them to Kafka (i.e. before the FlinkKafkaProducer operator).
Have a look at [1] and [2] on how to do that. Feel free to ask further questions if you bump into any!
Cheers,
Gordon
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamps_watermarks.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_timestamp_extractors.html
On April 2, 2017 at 6:38:13 PM, Archit Mittal (marchit51@gmail.com) wrote:
Hi
I am using flink-connector-kafka-0.10_2.10
while producing i am getting error as
java.lang.IllegalArgumentException: Invalid timestamp -9223372036854775808
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60) ~[kafka-clients-0.10.0.1.jar:na]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invokeInternal(FlinkKafkaProducer010.java:249) ~[flink-connector-kafka-0.10_2.10-1.2.0.jar:1.2.0]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.processElement(FlinkKafkaProducer010.java:345)
how do i put timestamp in my object before producing ?
Thanks
Archit