You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Arnaud Linz (Jira)" <ji...@apache.org> on 2020/03/09 14:57:00 UTC

[jira] [Created] (FLINK-16509) FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails

Arnaud Linz created FLINK-16509:
-----------------------------------

             Summary: FlinkKafkaConsumerBase tries to log a context that may not have been initialized and fails
                 Key: FLINK-16509
                 URL: https://issues.apache.org/jira/browse/FLINK-16509
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.10.0
         Environment: Unit test on local cluster, calling a unit test local kafka server.
            Reporter: Arnaud Linz


New code of FlinkKafkaConsumerBase#initializeState(), logs restored state with:

```

 LOG.info("Consumer subtask {} restored state: {}.", getRuntimeContext().getIndexOfThisSubtask(), restoredState); } else \{ LOG.info("Consumer subtask {} has no restore state.", getRuntimeContext().getIndexOfThisSubtask()); }

```

where as old (1.8.0) class was logging without calling getRuntimeContext :

```

 LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState); } else \{ LOG.info("No restore state for FlinkKafkaConsumer."); }

``` 

This causes a regression in my Kafka source unit test with exception:

``` 
java.lang.IllegalStateException: The runtime context has not been initialized.
    at org.apache.flink.api.common.functions.AbstractRichFunction.getRuntimeContext(AbstractRichFunction.java:53)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:886)
``` 
As the context is not always available at that point (initalizeState being called before open I guess)
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)