You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rahul Raj <ra...@gmail.com> on 2017/10/11 04:19:34 UTC
Windows getting created only on first execution
Hi ,
I have written a program which reads data from Kafka, parses the json and
does some reduce operation. The problem I am facing is, the program
executes perfectly for the first time on a day. But when I kill the program
and execute it again, an empty file is created. Even after compiling again
and running, an empty file is created.
var kafkaConsumer = new FlinkKafkaConsumer08(
params.getRequired("input-topic"),
new SimpleStringSchema,
params.getProperties)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
var messageStream =
env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
var mts = messageStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks[String] {
var ts = Long.MinValue
override def extractTimestamp(element: String,
previousElementTimestamp: Long): Long = {
var timestamp = json_decode(element).toLong
ts = Math.max(timestamp,previousElementTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
new Watermark(ts)
}
})
var output = mts
.keyBy(t=>json_decode(t))
.window(EventTimeSessionWindows.withGap(Time.seconds(60)))
.allowedLateness(Time.seconds(5))
.reduce((v1,v2)=>v1+"----"+v2)
output.writeAsText(path).setParallelism(1)
I am using FileSystem as statebackend. I am assuming this problem is
related to memory cleaning, but I don't understand what's happening.
Any help?
Rahul Raj
Re: Windows getting created only on first execution
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and that will be used as the read offset when restoring.
Best,
Aljoscha
> On 11. Oct 2017, at 12:58, Rahul Raj <ra...@gmail.com> wrote:
>
> Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka consumer worked for me. But it created one confusion, that is in case of failure if I start from a particular checkpoint or savepoint will the application start reading the message from a particular offset where checkpoint/savepoint was created or it will start reading from the first record in Kafka partition?
>
> Rahul Raj
>
> On 11 October 2017 at 15:44, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
>
> I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.
>
> Best,
> Aljoscha
>
>
>> On 11. Oct 2017, at 06:19, Rahul Raj <rahulrajmsrit@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi ,
>>
>> I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.
>>
>> var kafkaConsumer = new FlinkKafkaConsumer08(
>>
>> params.getRequired("input-topic"),
>>
>> new SimpleStringSchema,
>>
>> params.getProperties)
>>
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>
>>
>> var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
>>
>>
>>
>> var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
>>
>> var ts = Long.MinValue
>>
>>
>>
>> override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
>>
>> var timestamp = json_decode(element).toLong
>>
>> ts = Math.max(timestamp,previousElementTimestamp)
>>
>> timestamp
>>
>> }
>>
>>
>>
>> override def getCurrentWatermark(): Watermark = {
>>
>> new Watermark(ts)
>>
>> }
>>
>> })
>>
>> var output = mts
>>
>> .keyBy(t=>json_decode(t))
>>
>> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>
>> .allowedLateness(Time.seconds(5))
>>
>> .reduce((v1,v2)=>v1+"----"+v2)
>>
>>
>>
>> output.writeAsText(path).setParallelism(1)
>>
>>
>>
>> I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.
>>
>> Any help?
>>
>>
>>
>> Rahul Raj
>>
>>
>>
>
>
Re: Windows getting created only on first execution
Posted by Rahul Raj <ra...@gmail.com>.
Changing the group id didn't work for me, instead using
setStartfromEarliest() on kafka consumer worked for me. But it created one
confusion, that is in case of failure if I start from a particular
checkpoint or savepoint will the application start reading the message from
a particular offset where checkpoint/savepoint was created or it will start
reading from the first record in Kafka partition?
Rahul Raj
On 11 October 2017 at 15:44, Aljoscha Krettek <al...@apache.org> wrote:
> Hi,
>
> I think the problem is that your Kafka consumer has the same group-id
> across those two runs. This means that it will pick up the last "read
> position" of the previous run, and thus not read anything. If you change
> the group-id for the second run you should be able to read your data again.
>
> Best,
> Aljoscha
>
>
> On 11. Oct 2017, at 06:19, Rahul Raj <ra...@gmail.com> wrote:
>
> Hi ,
>
> I have written a program which reads data from Kafka, parses the json and
> does some reduce operation. The problem I am facing is, the program
> executes perfectly for the first time on a day. But when I kill the program
> and execute it again, an empty file is created. Even after compiling again
> and running, an empty file is created.
>
> var kafkaConsumer = new FlinkKafkaConsumer08(
>
> params.getRequired("input-topic"),
>
> new SimpleStringSchema,
>
> params.getProperties)
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> var messageStream = env.addSource(kafkaConsumer).
> filter(t=>t.contains(pattern))
>
>
> var mts = messageStream.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks[String] {
>
> var ts = Long.MinValue
>
>
> override def extractTimestamp(element: String,
> previousElementTimestamp: Long): Long = {
>
> var timestamp = json_decode(element).toLong
>
> ts = Math.max(timestamp,previousElementTimestamp)
>
> timestamp
>
> }
>
>
> override def getCurrentWatermark(): Watermark = {
>
> new Watermark(ts)
>
> }
>
> })
>
> var output = mts
>
> .keyBy(t=>json_decode(t))
>
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>
> .allowedLateness(Time.seconds(5))
>
> .reduce((v1,v2)=>v1+"----"+v2)
>
>
> output.writeAsText(path).setParallelism(1)
>
>
> I am using FileSystem as statebackend. I am assuming this problem is
> related to memory cleaning, but I don't understand what's happening.
>
> Any help?
>
>
> Rahul Raj
>
>
>
>
Re: Windows getting created only on first execution
Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.
Best,
Aljoscha
> On 11. Oct 2017, at 06:19, Rahul Raj <ra...@gmail.com> wrote:
>
> Hi ,
>
> I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.
>
> var kafkaConsumer = new FlinkKafkaConsumer08(
>
> params.getRequired("input-topic"),
>
> new SimpleStringSchema,
>
> params.getProperties)
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>
> var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
>
>
>
> var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
>
> var ts = Long.MinValue
>
>
>
> override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
>
> var timestamp = json_decode(element).toLong
>
> ts = Math.max(timestamp,previousElementTimestamp)
>
> timestamp
>
> }
>
>
>
> override def getCurrentWatermark(): Watermark = {
>
> new Watermark(ts)
>
> }
>
> })
>
> var output = mts
>
> .keyBy(t=>json_decode(t))
>
> .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>
> .allowedLateness(Time.seconds(5))
>
> .reduce((v1,v2)=>v1+"----"+v2)
>
>
>
> output.writeAsText(path).setParallelism(1)
>
>
>
> I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.
>
> Any help?
>
>
>
> Rahul Raj
>
>
>