You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Terry Beard (Jira)" <ji...@apache.org> on 2023/01/04 14:58:00 UTC

[jira] [Updated] (KAFKA-14565) Interceptor Resource Leakage Prevention

     [ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Terry Beard updated KAFKA-14565:
--------------------------------
    Summary: Interceptor Resource Leakage Prevention  (was: Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces)

> Interceptor Resource Leakage Prevention
> ---------------------------------------
>
>                 Key: KAFKA-14565
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14565
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Assignee: Terry Beard
>            Priority: Major
>
> h2. PROBLEM
> The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the AbstractConfig.getConfiguredInstances()  is delagated responsibilty for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured  List<ConsumerInterceptor<K,V>> interceptors.
> h2. Kafka Consumer Constructor
>  
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
>  {code}
>  
>  
> h2. Kafka Producer Constructor
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
>  {code}
> This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception.  This raising of the runtime exception results produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called.  
> h2. KafkaConsumer Constructor
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> .... {code}
> If the above line results in a runtime exception, the below this.interceptors is never created. 
> {code:java}
> this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
> h2. Kafka Producer{color:#172b4d} Constructor{color}
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); {code}
> If the above line results in a runtime exception, the below this.interceptors is never created. 
> {code:java}
> if (interceptors != null)
>     this.interceptors = interceptors;
> else
>     this.interceptors = new ProducerInterceptors<>(interceptorList);
> .... {code}
>  
> Although, both Kafka Consumer and Kafka Producer constructors try/catch implement  close for resource clean up, 
> {code:java}
> ...
> catch (Throwable t) {
>     // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
>     // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
>     if (this.log != null) {
>         close(0, true);
>     }
>     // now propagate the exception
>     throw new KafkaException("Failed to construct kafka consumer", t);
> } {code}
> their respective close implementation located in the catch above never calls the respective container interceptor close method below as the {color:#172b4d}*this{color}.{color:#403294}interceptors{color}* was never created.
> {code:java}
> private void close(long timeoutMs, boolean swallowException) {
>  ....  
> Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
>   .... {code}
> This problem is magnified within a webserver cluster i.e. Confluent's REST Proxy server where thousands of requests containing interceptor configuration failures can occur in seconds resulting in an inadvertent DDoS attack as cluster resources are quickly exhausted, disrupting all service activities.   
> h2. PROPOSAL
> To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces.  This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up.  Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface.  
>  
> {code:java}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.Configurable;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Map;
> public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {    
>     ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);    
>     void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);    
>     default void open() throws Exception {};
>    
>     void close();
> }
>  {code}
>  
>  
> {code:java}
> package org.apache.kafka.clients.producer;
> import org.apache.kafka.common.Configurable;
> public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
>    
>     ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);    
>     void onAcknowledgement(RecordMetadata metadata, Exception exception);
>     
>     default void open() throws Exception {};
>     
>     void close();
> }
>  {code}
>  
>  
> {color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding maybeOpen method which throws a checked Exception.  It's called maybeOpen for backwards compatibility purpose as it must  determine whether an interceptor's interface contains the newer open method before calling it accordingly.   {color}
> {color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific exception.{color}
>  
> {code:java}
> package org.apache.kafka.clients.consumer.internals;
> import org.apache.kafka.clients.consumer.ConsumerInterceptor;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.OffsetAndMetadata;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.common.TopicPartition;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;import java.io.Closeable;
> import java.util.List;
> import java.util.Arrays;
> import java.util.Map;/**
> public class ConsumerInterceptors<K, V> implements Closeable {
>     private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
>     private final List<ConsumerInterceptor<K, V>> interceptors;    
> public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }    
>     public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
>         ConsumerRecords<K, V> interceptRecords = records;
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptRecords = interceptor.onConsume(interceptRecords);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, log and continue calling other interceptors
>                 log.warn("Error executing interceptor onConsume callback", e);
>             }
>         }
>         return interceptRecords;
>     }    
>     public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.onCommit(offsets);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, just log
>                 log.warn("Error executing interceptor onCommit callback", e);
>             }
>         }
>     }    
>     @Override
>     public void close() {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close consumer interceptor ", e);
>             }
>         }
>     }    public List<ConsumerInterceptor<K, V>> getInterceptors() {
>         return interceptors;
>     }    
>      /**
>      * Only interceptors which implement {@link ConsumerInterceptor#open()} are   called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
>                     interceptor.open();
>                 }
>              } catch (Exception e) {
>                 log.error("Failed to open consumer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> } {code}
>  
>  
>  
> {code:java}
> package org.apache.kafka.clients.producer.internals;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.record.RecordBatch;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;import java.io.Closeable;
> import java.util.Arrays;
> import java.util.List;/**
> public class ProducerInterceptors<K, V> implements Closeable {
>     private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
>     private final List<ProducerInterceptor<K, V>> interceptors;    
> public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }
>    
> public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
>         ProducerRecord<K, V> interceptRecord = record;
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptRecord = interceptor.onSend(interceptRecord);
>             } catch (Exception e) {
>                 // do not propagate interceptor exception, log and continue calling other interceptors
>                 // be careful not to throw exception from here
>                 if (record != null)
>                     log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
>                 else
>                     log.warn("Error executing interceptor onSend callback", e);
>             }
>         }
>         return interceptRecord;
>     }    
> public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.onAcknowledgement(metadata, exception);
>             } catch (Exception e) {
>                 // do not propagate interceptor exceptions, just log
>                 log.warn("Error executing interceptor onAcknowledgement callback", e);
>             }
>         }
>     }    
>  public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 if (record == null && interceptTopicPartition == null) {
>                     interceptor.onAcknowledgement(null, exception);
>                 } else {
>                     if (interceptTopicPartition == null) {
>                         interceptTopicPartition = extractTopicPartition(record);
>                     }
>                     interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
>                                     RecordBatch.NO_TIMESTAMP, -1, -1), exception);
>                 }
>             } catch (Exception e) {
>                 // do not propagate interceptor exceptions, just log
>                 log.warn("Error executing interceptor onAcknowledgement callback", e);
>             }
>         }
>     }    
>   public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, V> record) {
>         return new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
>     }
>     
>     @Override
>     public void close() {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close producer interceptor ", e);
>             }
>         }
>     }    
>     /**
>      * Only interceptors which implement {@link ProducerInterceptor#open()} are called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
>                     interceptor.open();
>                 }
>             } catch (Exception e) {
>                 log.error("Failed to open producer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> }
>  {code}
> In summary, the overall workflow is that after the configured interceptor instances are returned by the AbstractConfig.getConfiguredInstances(),  the Kafka Consumer/Producer constructor's respective interceptor container maybeOpen method will be called.  
> If in the maybeOpen call, an exception occurs following the interceptor open method call, the respective client constructor's try/catch will call the interceptor container's close method which in-turn loops through and calls each interceptor's close method for clean up of resources allocated in the interceptor open method.
>  
> If an exception occurs in the configure method all objects will be garbage collected as this method must no longer be used for creating threads and/or objects which utilizes threads, connections or other resources which requires clean up.  
> h2. Kafka Consumer Constructor maybeOpen example
>  
> {code:java}
> ...
>         List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ConsumerInterceptor.class,
>                 Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));        this.interceptors = new ConsumerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();
> ...{code}
> h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
> {code:java}
> ...        
> List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ProducerInterceptor.class,
>                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
>         if (interceptors != null)
>             this.interceptors = interceptors;
>         else
>             this.interceptors = new ProducerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();  
> ...{code}
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)