You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Dhingra, Kajal" <Ka...@ncr.com.INVALID> on 2021/12/08 14:53:32 UTC

FileSink in Apache Flink not generating logs in output folder

I am new to Flink and doing a POC on it and using it to read data from kafka topic and to store it in files on server. I am using FileSink to store files, it creates the directory structure date and time wise but no logs files are getting created.

When i run the program it creates directory structure as below but log files are not getting stored here.


/flink/testlogs/2021-12-08--07

/flink/testlogs/2021-12-08--06

I want the log files should be written every 15 mins to a new log file. Below is the code.


DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));



OutputFileConfig config = OutputFileConfig

                 .builder()

                 .withPartPrefix("prefix")

                 .withPartSuffix(".ext")

                 .build();



DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());



final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),

                  new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))

                .withRollingPolicy(DefaultRollingPolicy.builder()

                        .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))

                        .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))

                        .withMaxPartSize(1024 * 1024 * 1024)

                        .build())

                .withOutputFileConfig(config)

                .build();



        newStream.sinkTo(sink);



env.execute("DataReader");



LogParser returns Tuple6.





Regards,



Re: FileSink in Apache Flink not generating logs in output folder

Posted by Fabian Paul <fp...@apache.org>.
Hi Kajal,

This looks indeed strange. Are you sure that there are records sent to
the sink? You can verify it by looking at some Flink metrics of tasks
before the task if they emit something. The sink should create a part
file immediately when it receives a record and the rolling policy
should ensure that a new partfile is created after 15min.
Although the FileSink relies on checkpointing and files are only
"fully" written once a checkpoint completes. The partfiles might be
overwritten after the job fails and recovers from the previous state.

I cced the @user because I think this question is more suitable for
this mailing list.

Best,
Fabian

On Wed, Dec 8, 2021 at 3:53 PM Dhingra, Kajal
<Ka...@ncr.com.invalid> wrote:
>
> I am new to Flink and doing a POC on it and using it to read data from kafka topic and to store it in files on server. I am using FileSink to store files, it creates the directory structure date and time wise but no logs files are getting created.
>
> When i run the program it creates directory structure as below but log files are not getting stored here.
>
>
> /flink/testlogs/2021-12-08--07
>
> /flink/testlogs/2021-12-08--06
>
> I want the log files should be written every 15 mins to a new log file. Below is the code.
>
>
> DataStream <String> kafkaTopicData = env.addSource(new FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));
>
>
>
> OutputFileConfig config = OutputFileConfig
>
>                  .builder()
>
>                  .withPartPrefix("prefix")
>
>                  .withPartSuffix(".ext")
>
>                  .build();
>
>
>
> DataStream <Tuple6 < String,String,String ,String, String ,Integer >> newStream=kafkaTopicData.map(new LogParser());
>
>
>
> final FileSink<Tuple6<String, String, String, String, String, Integer>> sink = FileSink.forRowFormat(new Path("/flink/testlogs"),
>
>                   new SimpleStringEncoder < Tuple6 < String,String,String ,String, String ,Integer >> ("UTF-8"))
>
>                 .withRollingPolicy(DefaultRollingPolicy.builder()
>
>                         .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
>
>                         .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
>
>                         .withMaxPartSize(1024 * 1024 * 1024)
>
>                         .build())
>
>                 .withOutputFileConfig(config)
>
>                 .build();
>
>
>
>         newStream.sinkTo(sink);
>
>
>
> env.execute("DataReader");
>
>
>
> LogParser returns Tuple6.
>
>
>
>
>
> Regards,
>
>