You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by PedroMrChaves <pe...@gmail.com> on 2016/10/21 14:42:01 UTC

FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

Hello,

Am getting the following warning upon executing a checkpoint 

/2016-10-21 16:31:54,229 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 5 @ 1477063914229
2016-10-21 16:31:54,233 INFO 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 5 (in 3 ms)
2016-10-21 16:31:54,234 WARN 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
Received confirmation for unknown checkpoint id 5/

This is the code I have to setup the environment and the kafka consumer:

 / /**
     * Flink execution environment configuration
     */
    private void setupEnvironmnet() {
        environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.enableCheckpointing(CHECKPOINTING_INTERVAL);
        tableEnvironment =
TableEnvironment.getTableEnvironment(environment);

    }

    /**
     * Kafka Consumer configuration
     */
    private void kafkaConsumer(String server, String topic) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", server);
        properties.setProperty("group.id", "Demo");
        stream = environment.addSource(new FlinkKafkaConsumer09<>(topic, new
SimpleStringSchema(), properties))
                .map(new Parser());
    }/


Any idea what the problem might be?

Thank you and regards,
Pedro Chaves



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

I tracked down the problem and have a fix in this PR https://github.com/apache/flink/pull/2706 <https://github.com/apache/flink/pull/2706> . Besides the misleading warning, the code should also still behave correctly in the old version.

Best,
Stefan

> Am 27.10.2016 um 17:25 schrieb Robert Metzger <rm...@apache.org>:
> 
> Hi,
> it would be nice if you could check with a stable version as well.
> 
> Thank you!
> 
> On Thu, Oct 27, 2016 at 9:58 AM, PedroMrChaves <pedro.mr.chaves@gmail.com <ma...@gmail.com>> wrote:
> Hello,
> 
> I Am using the version 1.2-SNAPSHOT.
> I will try with a stable version to see if the problem persists.
> 
> Regards,
> Pedro Chaves.
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674p9749.html <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674p9749.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
> 


Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

Posted by Robert Metzger <rm...@apache.org>.
Hi,
it would be nice if you could check with a stable version as well.

Thank you!

On Thu, Oct 27, 2016 at 9:58 AM, PedroMrChaves <pe...@gmail.com>
wrote:

> Hello,
>
> I Am using the version 1.2-SNAPSHOT.
> I will try with a stable version to see if the problem persists.
>
> Regards,
> Pedro Chaves.
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-
> Received-confirmation-for-unknown-checkpoint-tp9674p9749.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

Posted by PedroMrChaves <pe...@gmail.com>.
Hello,

I Am using the version 1.2-SNAPSHOT. 
I will try with a stable version to see if the problem persists.

Regards,
Pedro Chaves. 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674p9749.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

Posted by Robert Metzger <rm...@apache.org>.
Hi Pedro,
The message is a bit unexpected for me as well, but it does not make the
checkpointing inconsistent. The only thing that's not happening in case of
this warning is that the offsets are not written to Zookeeper.

Which Flink version are you using?



On Mon, Oct 24, 2016 at 7:25 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> @Robert, do you have any idea what might be going on here?
>
>
> On Fri, 21 Oct 2016 at 16:50 PedroMrChaves <pe...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Am getting the following warning upon executing a checkpoint
>>
>> /2016-10-21 16:31:54,229 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     -
>> Triggering
>> checkpoint 5 @ 1477063914229
>> 2016-10-21 16:31:54,233 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>> checkpoint 5 (in 3 ms)
>> 2016-10-21 16:31:54,234 WARN
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
>> Received confirmation for unknown checkpoint id 5/
>>
>> This is the code I have to setup the environment and the kafka consumer:
>>
>>  / /**
>>      * Flink execution environment configuration
>>      */
>>     private void setupEnvironmnet() {
>>         environment = StreamExecutionEnvironment.
>> getExecutionEnvironment();
>>         environment.enableCheckpointing(CHECKPOINTING_INTERVAL);
>>         tableEnvironment =
>> TableEnvironment.getTableEnvironment(environment);
>>
>>     }
>>
>>     /**
>>      * Kafka Consumer configuration
>>      */
>>     private void kafkaConsumer(String server, String topic) {
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", server);
>>         properties.setProperty("group.id", "Demo");
>>         stream = environment.addSource(new FlinkKafkaConsumer09<>(topic,
>> new
>> SimpleStringSchema(), properties))
>>                 .map(new Parser());
>>     }/
>>
>>
>> Any idea what the problem might be?
>>
>> Thank you and regards,
>> Pedro Chaves
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-
>> mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-
>> Received-confirmation-for-unknown-checkpoint-tp9674.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>

Re: FlinkKafkaConsumerBase - Received confirmation for unknown checkpoint

Posted by Aljoscha Krettek <al...@apache.org>.
@Robert, do you have any idea what might be going on here?

On Fri, 21 Oct 2016 at 16:50 PedroMrChaves <pe...@gmail.com>
wrote:

> Hello,
>
> Am getting the following warning upon executing a checkpoint
>
> /2016-10-21 16:31:54,229 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
> checkpoint 5 @ 1477063914229
> 2016-10-21 16:31:54,233 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
> checkpoint 5 (in 3 ms)
> 2016-10-21 16:31:54,234 WARN
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> Received confirmation for unknown checkpoint id 5/
>
> This is the code I have to setup the environment and the kafka consumer:
>
>  / /**
>      * Flink execution environment configuration
>      */
>     private void setupEnvironmnet() {
>         environment = StreamExecutionEnvironment.getExecutionEnvironment();
>         environment.enableCheckpointing(CHECKPOINTING_INTERVAL);
>         tableEnvironment =
> TableEnvironment.getTableEnvironment(environment);
>
>     }
>
>     /**
>      * Kafka Consumer configuration
>      */
>     private void kafkaConsumer(String server, String topic) {
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", server);
>         properties.setProperty("group.id", "Demo");
>         stream = environment.addSource(new FlinkKafkaConsumer09<>(topic,
> new
> SimpleStringSchema(), properties))
>                 .map(new Parser());
>     }/
>
>
> Any idea what the problem might be?
>
> Thank you and regards,
> Pedro Chaves
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumerBase-Received-confirmation-for-unknown-checkpoint-tp9674.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>