You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Slotterback, Chris" <Ch...@comcast.com> on 2019/05/06 14:20:31 UTC

Class loader premature closure - NoClassDefFoundError: org/apache/kafka/clients/NetworkClient

Hey Flink users,

Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its write semantic set to Semantic.EXACTLY_ONCE. When there is a job failure and restart (in our case from checkpoint timeout), it begins a failure loop that requires a cancellation and resubmission to fix. The expected and desired outcome should be a recovery from failure and the job restarts successfully. Some digging revealed an issue where the class loader closes before the connection to kafka is fully terminated resulting in a NoClassDefFoundError. A description of what is happening has already been described here: https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror, though we are experiencing this with kafka, not Redis:

5/3/19
3:14:18.780 PM
2019-05-03 15:14:18,780 ERROR org.apache.kafka.common.utils.KafkaThread                     - Uncaught exception in thread 'kafka-producer-network-thread | producer-80':
java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
at org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
at org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)
Collapse
date_hour =       15

Interestingly, this only happens when we extend the FlinkKafkaProducer for the purposes of setting the write semantic to EXACTLY_ONCE. When running with the default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE), the class loader has no issues disconnecting the kafka client on job failure, and the job recovers just fine. We are not doing anything particularly strange in our extended producer as far as I can tell:

public class CustomFlinkKafkaProducer<IN> extends FlinkKafkaProducer<IN> {

  public CustomFlinkKafkaProducer(Properties properties, String topicId,
      AvroKeyedSerializer<IN> serializationSchema) {
    super(
        topicId,
        serializationSchema,
        properties,
        Optional.of(new FlinkFixedPartitioner<>()),
        Semantic.EXACTLY_ONCE,
        DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
  }
  public static Properties getPropertiesFromBrokerList(String brokerList) {
    […]
  }
}



Re: Class loader premature closure - NoClassDefFoundError: org/apache/kafka/clients/NetworkClient

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for reporting this issue Chris. It looks indeed as if FLINK-10455
has not been fully fixed. I've reopened it and linked this mailing list
thread. If you want, then you could write to the JIRA thread as well. What
would be super helpful is if you manage to create a reproducing example for
further debugging.

Cheers,
Till

On Tue, May 7, 2019 at 4:04 PM Rohan Thimmappa <ro...@gmail.com>
wrote:

> It is a blocker for exactly once support from flink kafka producer.
>
> This issue reported and closed. but still reproducible
> https://issues.apache.org/jira/browse/FLINK-10455
>
> On Mon, May 6, 2019 at 10:20 AM Slotterback, Chris <
> Chris_Slotterback@comcast.com> wrote:
>
>> Hey Flink users,
>>
>>
>>
>> Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its
>> write semantic set to Semantic.EXACTLY_ONCE. When there is a job failure
>> and restart (in our case from checkpoint timeout), it begins a failure loop
>> that requires a cancellation and resubmission to fix. The expected and
>> desired outcome should be a recovery from failure and the job restarts
>> successfully. Some digging revealed an issue where the class loader closes
>> before the connection to kafka is fully terminated resulting in a
>> NoClassDefFoundError. A description of what is happening has already been
>> described here:
>> https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror,
>> though we are experiencing this with kafka, not Redis:
>>
>>
>>
>> 5/3/19
>>
>> 3:14:18.780 PM
>>
>> 2019-05-03 15:14:18,780 ERROR
>> org.apache.kafka.common.utils.KafkaThread                     - Uncaught
>> exception in thread 'kafka-producer-network-thread | producer-80':
>>
>> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>>
>> at
>> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
>>
>> at
>> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
>>
>> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
>>
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
>>
>> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Collapse
>>
>> date_hour =       15
>>
>>
>>
>> Interestingly, this only happens when we extend the FlinkKafkaProducer
>> for the purposes of setting the write semantic to EXACTLY_ONCE. When
>> running with the default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE),
>> the class loader has no issues disconnecting the kafka client on job
>> failure, and the job recovers just fine. We are not doing anything
>> particularly strange in our extended producer as far as I can tell:
>>
>>
>>
>> public class *CustomFlinkKafkaProducer*<IN> *extends*
>> *FlinkKafkaProducer*<IN> {
>>
>>
>>
>>   public *CustomFlinkKafkaProducer*(Properties properties, String
>> topicId,
>>
>>       AvroKeyedSerializer<IN> serializationSchema) {
>>
>>     super(
>>
>>         topicId,
>>
>>         serializationSchema,
>>
>>         properties,
>>
>>         Optional.of(new FlinkFixedPartitioner<>()),
>>
>>         *Semantic.EXACTLY_ONCE*,
>>
>>         DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>>
>>   }
>>
>>   public static Properties getPropertiesFromBrokerList(String brokerList)
>> {
>>
>>     […]
>>
>>   }
>>
>> }
>>
>>
>>
>>
>>
>
>
> --
> Thanks
> Rohan
>

Re: Class loader premature closure - NoClassDefFoundError: org/apache/kafka/clients/NetworkClient

Posted by Rohan Thimmappa <ro...@gmail.com>.
It is a blocker for exactly once support from flink kafka producer.

This issue reported and closed. but still reproducible
https://issues.apache.org/jira/browse/FLINK-10455

On Mon, May 6, 2019 at 10:20 AM Slotterback, Chris <
Chris_Slotterback@comcast.com> wrote:

> Hey Flink users,
>
>
>
> Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its
> write semantic set to Semantic.EXACTLY_ONCE. When there is a job failure
> and restart (in our case from checkpoint timeout), it begins a failure loop
> that requires a cancellation and resubmission to fix. The expected and
> desired outcome should be a recovery from failure and the job restarts
> successfully. Some digging revealed an issue where the class loader closes
> before the connection to kafka is fully terminated resulting in a
> NoClassDefFoundError. A description of what is happening has already been
> described here:
> https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror,
> though we are experiencing this with kafka, not Redis:
>
>
>
> 5/3/19
>
> 3:14:18.780 PM
>
> 2019-05-03 15:14:18,780 ERROR
> org.apache.kafka.common.utils.KafkaThread                     - Uncaught
> exception in thread 'kafka-producer-network-thread | producer-80':
>
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>
> at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
>
> at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Collapse
>
> date_hour =       15
>
>
>
> Interestingly, this only happens when we extend the FlinkKafkaProducer for
> the purposes of setting the write semantic to EXACTLY_ONCE. When running
> with the default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE), the
> class loader has no issues disconnecting the kafka client on job failure,
> and the job recovers just fine. We are not doing anything particularly
> strange in our extended producer as far as I can tell:
>
>
>
> public class *CustomFlinkKafkaProducer*<IN> *extends* *FlinkKafkaProducer*<IN>
> {
>
>
>
>   public *CustomFlinkKafkaProducer*(Properties properties, String topicId,
>
>       AvroKeyedSerializer<IN> serializationSchema) {
>
>     super(
>
>         topicId,
>
>         serializationSchema,
>
>         properties,
>
>         Optional.of(new FlinkFixedPartitioner<>()),
>
>         *Semantic.EXACTLY_ONCE*,
>
>         DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>
>   }
>
>   public static Properties getPropertiesFromBrokerList(String brokerList) {
>
>     […]
>
>   }
>
> }
>
>
>
>
>


-- 
Thanks
Rohan