You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rahul Raj <ra...@gmail.com> on 2017/10/11 04:19:34 UTC

Windows getting created only on first execution

Hi ,

I have written a program which reads data from Kafka, parses the json and
does some reduce operation. The problem I am facing is, the program
executes perfectly for the first time on a day. But when I kill the program
and execute it again, an empty file is created. Even after compiling again
and running, an empty file is created.

var kafkaConsumer = new FlinkKafkaConsumer08(

      params.getRequired("input-topic"),

      new SimpleStringSchema,

      params.getProperties)

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    var messageStream =
env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))


    var mts = messageStream.assignTimestampsAndWatermarks(new
AssignerWithPeriodicWatermarks[String] {

      var ts = Long.MinValue

      override def extractTimestamp(element: String,
previousElementTimestamp: Long): Long = {

        var timestamp = json_decode(element).toLong

        ts = Math.max(timestamp,previousElementTimestamp)

        timestamp

      }


      override def getCurrentWatermark(): Watermark = {

        new Watermark(ts)

      }

    })

    var output = mts

      .keyBy(t=>json_decode(t))

      .window(EventTimeSessionWindows.withGap(Time.seconds(60)))

      .allowedLateness(Time.seconds(5))

      .reduce((v1,v2)=>v1+"----"+v2)


output.writeAsText(path).setParallelism(1)


I am using FileSystem as statebackend. I am assuming this problem is
related to memory cleaning, but I don't understand what's happening.

Any help?


Rahul Raj

Re: Windows getting created only on first execution

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

When you are restoring from a savepoint (or checkpoint) the offsets in Kafka are complete ignored. Flink is checkpointing the offset at the time the checkpoint/savepoint is taken and that will be used as the read offset when restoring.

Best,
Aljoscha

> On 11. Oct 2017, at 12:58, Rahul Raj <ra...@gmail.com> wrote:
> 
> Changing the group id didn't work for me, instead using setStartfromEarliest() on kafka consumer worked for me. But it created one confusion, that is in case of failure if I start from a particular checkpoint or savepoint will the application start reading the message from a particular offset where checkpoint/savepoint was created or it will start reading from the first record in Kafka partition?
> 
> Rahul Raj 
> 
> On 11 October 2017 at 15:44, Aljoscha Krettek <aljoscha@apache.org <ma...@apache.org>> wrote:
> Hi,
> 
> I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.
> 
> Best,
> Aljoscha
> 
> 
>> On 11. Oct 2017, at 06:19, Rahul Raj <rahulrajmsrit@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi ,
>> 
>> I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.
>> 
>> var kafkaConsumer = new FlinkKafkaConsumer08(
>> 
>>       params.getRequired("input-topic"),
>> 
>>       new SimpleStringSchema,
>> 
>>       params.getProperties)
>> 
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> 
>> 
>>     var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
>> 
>> 
>> 
>>     var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
>> 
>>       var ts = Long.MinValue
>> 
>> 
>> 
>>       override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
>> 
>>         var timestamp = json_decode(element).toLong
>> 
>>         ts = Math.max(timestamp,previousElementTimestamp)
>> 
>>         timestamp
>> 
>>       }
>> 
>> 
>> 
>>       override def getCurrentWatermark(): Watermark = {
>> 
>>         new Watermark(ts)
>> 
>>       }
>> 
>>     })
>> 
>>     var output = mts
>> 
>>       .keyBy(t=>json_decode(t))
>> 
>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>> 
>>       .allowedLateness(Time.seconds(5))
>> 
>>       .reduce((v1,v2)=>v1+"----"+v2)
>> 
>> 
>> 
>> output.writeAsText(path).setParallelism(1)
>> 
>> 
>> 
>> I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.
>> 
>> Any help?
>> 
>> 
>> 
>> Rahul Raj
>> 
>> 
>> 
> 
> 


Re: Windows getting created only on first execution

Posted by Rahul Raj <ra...@gmail.com>.
Changing the group id didn't work for me, instead using
setStartfromEarliest() on kafka consumer worked for me. But it created one
confusion, that is in case of failure if I start from a particular
checkpoint or savepoint will the application start reading the message from
a particular offset where checkpoint/savepoint was created or it will start
reading from the first record in Kafka partition?

Rahul Raj

On 11 October 2017 at 15:44, Aljoscha Krettek <al...@apache.org> wrote:

> Hi,
>
> I think the problem is that your Kafka consumer has the same group-id
> across those two runs. This means that it will pick up the last "read
> position" of the previous run, and thus not read anything. If you change
> the group-id for the second run you should be able to read your data again.
>
> Best,
> Aljoscha
>
>
> On 11. Oct 2017, at 06:19, Rahul Raj <ra...@gmail.com> wrote:
>
> Hi ,
>
> I have written a program which reads data from Kafka, parses the json and
> does some reduce operation. The problem I am facing is, the program
> executes perfectly for the first time on a day. But when I kill the program
> and execute it again, an empty file is created. Even after compiling again
> and running, an empty file is created.
>
> var kafkaConsumer = new FlinkKafkaConsumer08(
>
>       params.getRequired("input-topic"),
>
>       new SimpleStringSchema,
>
>       params.getProperties)
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     var messageStream = env.addSource(kafkaConsumer).
> filter(t=>t.contains(pattern))
>
>
>     var mts = messageStream.assignTimestampsAndWatermarks(new
> AssignerWithPeriodicWatermarks[String] {
>
>       var ts = Long.MinValue
>
>
>       override def extractTimestamp(element: String,
> previousElementTimestamp: Long): Long = {
>
>         var timestamp = json_decode(element).toLong
>
>         ts = Math.max(timestamp,previousElementTimestamp)
>
>         timestamp
>
>       }
>
>
>       override def getCurrentWatermark(): Watermark = {
>
>         new Watermark(ts)
>
>       }
>
>     })
>
>     var output = mts
>
>       .keyBy(t=>json_decode(t))
>
>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>
>       .allowedLateness(Time.seconds(5))
>
>       .reduce((v1,v2)=>v1+"----"+v2)
>
>
> output.writeAsText(path).setParallelism(1)
>
>
> I am using FileSystem as statebackend. I am assuming this problem is
> related to memory cleaning, but I don't understand what's happening.
>
> Any help?
>
>
> Rahul Raj
>
>
>
>

Re: Windows getting created only on first execution

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think the problem is that your Kafka consumer has the same group-id across those two runs. This means that it will pick up the last "read position" of the previous run, and thus not read anything. If you change the group-id for the second run you should be able to read your data again.

Best,
Aljoscha

> On 11. Oct 2017, at 06:19, Rahul Raj <ra...@gmail.com> wrote:
> 
> Hi ,
> 
> I have written a program which reads data from Kafka, parses the json and does some reduce operation. The problem I am facing is, the program executes perfectly for the first time on a day. But when I kill the program and execute it again, an empty file is created. Even after compiling again and running, an empty file is created.
> 
> var kafkaConsumer = new FlinkKafkaConsumer08(
> 
>       params.getRequired("input-topic"),
> 
>       new SimpleStringSchema,
> 
>       params.getProperties)
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
>     var messageStream = env.addSource(kafkaConsumer).filter(t=>t.contains(pattern))
> 
> 
> 
>     var mts = messageStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[String] {
> 
>       var ts = Long.MinValue
> 
> 
> 
>       override def extractTimestamp(element: String, previousElementTimestamp: Long): Long = {
> 
>         var timestamp = json_decode(element).toLong
> 
>         ts = Math.max(timestamp,previousElementTimestamp)
> 
>         timestamp
> 
>       }
> 
> 
> 
>       override def getCurrentWatermark(): Watermark = {
> 
>         new Watermark(ts)
> 
>       }
> 
>     })
> 
>     var output = mts
> 
>       .keyBy(t=>json_decode(t))
> 
>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
> 
>       .allowedLateness(Time.seconds(5))
> 
>       .reduce((v1,v2)=>v1+"----"+v2)
> 
> 
> 
> output.writeAsText(path).setParallelism(1)
> 
> 
> 
> I am using FileSystem as statebackend. I am assuming this problem is related to memory cleaning, but I don't understand what's happening.
> 
> Any help?
> 
> 
> 
> Rahul Raj
> 
> 
>