You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jose Miguel Tejedor Fernandez <jo...@rovio.com> on 2017/10/06 13:48:17 UTC

Lost data when resuming from savepoint

Hi,

I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1)
whose source and sink is a Kafka cluster 0.10.0.1.

I am testing savepoints by stopping/resuming the job and when I checked the
validity of the data sunk during the stop time I observed that some of the
events have been lost.

The stream of events is around 6K per 10 minutes and around 50% are lost. I
share the code in case you can indicate me any hint.


Job is resumed correctly from last savepoint and checkpoints configuration
is as follow:

env.setStateBackend(new
FsStateBackend("s3://my_bucket/flink/checkpoints/"));
env.enableCheckpointing(params.getLong("checkpoint.interval", 300000));
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(params.getLong("checkpoint.minPause",
60 * 1000));
env.getCheckpointConfig().setMaxConcurrentCheckpoints(params.getInt("checkpoint.maxConcurrent",
1));
env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
10 * 60 * 1000));
 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);



The Kafka consumer:

        SingleOutputStreamOperator<Map<String, String>>
createKafkaStream(Collection<String> topics, int parallelism, Properties
kafkaProps, StreamExecutionEnvironment env,
            int resetOffsetsTo, String eventTimeField)
        {
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        FlinkKafkaConsumer010<Map<String, String>> consumer = new
FlinkKafkaConsumer010<>(
                new LinkedList<>(topics),
                new EventMapSchema(), kafkaProps);

        DataStream<Map<String, String>> messageStream = env
                .addSource(consumer)
                .name("Kafka (" + StringUtils.join(topics, ", ") + ")")
                .setParallelism(parallelism);

        return messageStream
                // discard events that don't have the event time field
                .filter(new MissingTimeStampFilter(eventTimeField))
                // provide ascending timestamps for
TimeCharacteristic.EventTime
                .assignTimestampsAndWatermarks(new
EventMapTimestampExtractor(eventTimeField));

         }

        ....

        StreamTableUtils.registerTable("events", kafkaStream, fieldNames,
tableEnv);

        String sql = "SELECT\n" +
                "  field_1 AS a,\n" +
                "  field_2 AS b,\n" +
                "  field_3 AS c,\n" +
                "  field_4 AS d,\n" +
                "  field_5 AS e,\n" +
                " FROM events\n" +
                " WHERE field_1 IS NOT NULL";
        LOG.info("sql: {}", sql);


        Table result = tableEnv.sql(sql);
        System.err.println("output fields: " +
Arrays.toString(result.getSchema().getColumnNames()));

        if (printToErr) {≤
            printRows(result, tableEnv);
        }

        if (!StringUtils.isBlank(outputTopic)) {
            TableSink<?> tableSink = new
Kafka09CustomJsonTableSink(outputTopic, KafkaStreams.getProperties(params),
                    new FlinkFixedPartitioner<>(), timestampColumn);
            result.writeToSink(tableSink);
        }

        env.execute();

Cheers

BR


*JM*

Re: Lost data when resuming from savepoint

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Jose,

I had a look at your program but did not spot anything.
The query is a simple "SELECT FROM WHERE" query that does not have any
state.
So the only state is the state of the Kafka source, i.e, the offset.

How much time did pass between taking the savepoint and resuming?
Did you see any exceptions in the log files (TM, JM)?

Thanks, Fabian

2017-10-06 15:48 GMT+02:00 Jose Miguel Tejedor Fernandez <
jose.fernandez@rovio.com>:

> Hi,
>
> I am running a simple stream Flink job (Flink version 1.3.2 and 1.3.1)
> whose source and sink is a Kafka cluster 0.10.0.1.
>
> I am testing savepoints by stopping/resuming the job and when I checked
> the validity of the data sunk during the stop time I observed that some of
> the events have been lost.
>
> The stream of events is around 6K per 10 minutes and around 50% are lost.
> I share the code in case you can indicate me any hint.
>
>
> Job is resumed correctly from last savepoint and checkpoints configuration
> is as follow:
>
> env.setStateBackend(new FsStateBackend("s3://my_
> bucket/flink/checkpoints/"));
> env.enableCheckpointing(params.getLong("checkpoint.interval", 300000));
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(
> params.getLong("checkpoint.minPause", 60 * 1000));
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(
> params.getInt("checkpoint.maxConcurrent", 1));
> env.getCheckpointConfig().setCheckpointTimeout(params.getLong("checkpoint.timeout",
> 10 * 60 * 1000));
>  env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.
> ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
> The Kafka consumer:
>
>         SingleOutputStreamOperator<Map<String, String>>
> createKafkaStream(Collection<String> topics, int parallelism, Properties
> kafkaProps, StreamExecutionEnvironment env,
>             int resetOffsetsTo, String eventTimeField)
>         {
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>         FlinkKafkaConsumer010<Map<String, String>> consumer = new
> FlinkKafkaConsumer010<>(
>                 new LinkedList<>(topics),
>                 new EventMapSchema(), kafkaProps);
>
>         DataStream<Map<String, String>> messageStream = env
>                 .addSource(consumer)
>                 .name("Kafka (" + StringUtils.join(topics, ", ") + ")")
>                 .setParallelism(parallelism);
>
>         return messageStream
>                 // discard events that don't have the event time field
>                 .filter(new MissingTimeStampFilter(eventTimeField))
>                 // provide ascending timestamps for
> TimeCharacteristic.EventTime
>                 .assignTimestampsAndWatermarks(new
> EventMapTimestampExtractor(eventTimeField));
>
>          }
>
>         ....
>
>         StreamTableUtils.registerTable("events", kafkaStream, fieldNames,
> tableEnv);
>
>         String sql = "SELECT\n" +
>                 "  field_1 AS a,\n" +
>                 "  field_2 AS b,\n" +
>                 "  field_3 AS c,\n" +
>                 "  field_4 AS d,\n" +
>                 "  field_5 AS e,\n" +
>                 " FROM events\n" +
>                 " WHERE field_1 IS NOT NULL";
>         LOG.info("sql: {}", sql);
>
>
>         Table result = tableEnv.sql(sql);
>         System.err.println("output fields: " + Arrays.toString(result.
> getSchema().getColumnNames()));
>
>         if (printToErr) {≤
>             printRows(result, tableEnv);
>         }
>
>         if (!StringUtils.isBlank(outputTopic)) {
>             TableSink<?> tableSink = new Kafka09CustomJsonTableSink(outputTopic,
> KafkaStreams.getProperties(params),
>                     new FlinkFixedPartitioner<>(), timestampColumn);
>             result.writeToSink(tableSink);
>         }
>
>         env.execute();
>
> Cheers
>
> BR
>
>
> *JM*
>
>
>