You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Terry Beard (Jira)" <ji...@apache.org> on 2023/01/04 02:46:00 UTC

[jira] [Updated] (KAFKA-14566) Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces

     [ https://issues.apache.org/jira/browse/KAFKA-14566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Terry Beard updated KAFKA-14566:
--------------------------------
    Description: 
h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the AbstractConfig.getConfiguredInstances()  is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured  *List<ConsumerInterceptor<K,V>>* interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
Kafka Producer Constructor
{code}
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));{code}
 

This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception.  This raising of the runtime exception results produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called.
h2.  Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
....{code}
 

If the above line results in a runtime exception, the below {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is never created. 

 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
 
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
 

If the above line results in a runtime exception, the below this.interceptors is never created. 

 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
 

Although, both Kafka Consumer and Kafka Producer constructors try/catch implement  close for resource clean up, 

 
{code:java}
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
 

their respective close implementation located in the catch above never calls the respective container interceptor close method below as the {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was never created.

 
{code:java}
private void close(long timeoutMs, boolean swallowException) {
 ....  
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
 ....{code}
 

This problem is magnified within a webserver cluster i.e. Confluent's REST Proxy server where thousands of requests containing interceptor configuration failures can occur in seconds resulting in an inadvertent DDoS attack as cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces.  This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up.  Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface.  
h2. Kafka Consumer Interceptor Default Open Implementation
{code:java}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
  
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    default void open() throws Exception {};
 
    void close();
}
{code}
 
h2. Kafka Producer Interceptor Default Open Implementation
{code:java}
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
   
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
   
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    default void open() throws Exception {};

    void close();
}
{code}
 

Additionally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding maybeOpen method which throws a checked Exception.  It's called maybeOpen for backwards compatibility purpose as it must  determine whether an interceptor's interface contains the newer open method before calling it accordingly. 

 *NOTE:* Developers are encouraged to throw a more specific exception.
h2. Kafka Consumer Interceptors MaybeOpen Implementation
{code:java}
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;

public class ConsumerInterceptors<K, V> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
    private final List<ConsumerInterceptor<K, V>> interceptors;
    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
 
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }
    /**
     * Closes every interceptor in a container.
     */
    @Override
    public void close() {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close consumer interceptor ", e);
            }
        }
    }
    public List<ConsumerInterceptor<K, V>> getInterceptors() {
        return interceptors;
    }
    /**
     * Only interceptors which implement {@link ConsumerInterceptor#open()} are called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
                    interceptor.open();
                }
             } catch (Exception e) {
                log.error("Failed to open consumer interceptor ", e);
                throw e;
            }
        }
    }
} {code}
h2. Kafka Producer Interceptors MaybeOpen Implementation
{code:java}
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;

public class ProducerInterceptors<K, V> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
    private final List<ProducerInterceptor<K, V>> interceptors;
    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
   public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
               if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
  
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
 
    public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = extractTopicPartition(record);
                    }
                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, -1, -1), exception);
                }
            } catch (Exception e) {
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
    public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, V> record) {
        return new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
    }
  
    @Override
    public void close() {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close producer interceptor ", e);
            }
        }
    }
    /**
     * Only interceptors which implement {@link ProducerInterceptor#open()} are called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
                    interceptor.open();
                }
            } catch (Exception e) {
                log.error("Failed to open producer interceptor ", e);
                throw e;
            }
        }
    }
}
{code}
 

In summary, the overall workflow is that after the configured interceptor instances are returned by the AbstractConfig.getConfiguredInstances(),  the Kafka Consumer/Producer constructor's respective interceptor container maybeOpen method will be called.

 
If in the maybeOpen call, an exception occurs following the interceptor open method call, the respective client constructor's try/catch will call the interceptor container's close method which in-turn loops through and calls each interceptor's close method for clean up of resources allocated in the interceptor open method.  

If an exception occurs in the configure method all objects will be garbage collected as this method must no longer be used for creating threads and/or objects which utilizes threads, connections or other resources which requires clean up.  
h2. Kafka Consumer Constructor With MaybeOpen Method Call
{code:java}
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
    try {
...
        List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
   ...
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
        // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
        if (this.log != null) {
            close(0, true);
        }
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}
{code}
h2. Kafka Producer Constructor with MaybeOpen Method Call
{code:java}
KafkaProducer(ProducerConfig config,
              Serializer<K> keySerializer,
              Serializer<V> valueSerializer,
              ProducerMetadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors<K, V> interceptors,
              Time time) {
    try {
      ...
        List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
  ...
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
        close(Duration.ofMillis(0), true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}
{code}
 

  was:
h2. PROBLEM

The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. 

Currently within the Kafka Consumer and Kafka Producer constructors,  the AbstractConfig.getConfiguredInstances()  is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured  *List<ConsumerInterceptor<K,V>>* interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
Kafka Producer Constructor
{code}
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));{code}
 

This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception.  This raising of the runtime exception results produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called.
h2.  Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ConsumerInterceptor.class,
        Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
....{code}
If the above line results in a runtime exception, the below {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is never created. 

 
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
 
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
        ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
        ProducerInterceptor.class,
        Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
If the above line results in a runtime exception, the below this.interceptors is never created. 
{code:java}
if (interceptors != null)
    this.interceptors = interceptors;
else
    this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
Although, both Kafka Consumer and Kafka Producer constructors try/catch implement  close for resource clean up, 
{code:java}
catch (Throwable t) {
    // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
    // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
    if (this.log != null) {
        close(0, true);
    }
    // now propagate the exception
    throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls the respective container interceptor close method below as the {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was never created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
 ....  
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
 ....{code}
 

This problem is magnified within a webserver cluster i.e. Confluent's REST Proxy server where thousands of requests containing interceptor configuration failures can occur in seconds resulting in an inadvertent DDoS attack as cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL

To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces.  This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up.  Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface.  
h2. Kafka Consumer Interceptor Default Open Implementation
{code:java}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
  
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  
    void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
    default void open() throws Exception {};
 
    void close();
}
{code}
 
h2. 
Kafka Producer Interceptor Default Open Implementation
{code:java}
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
   
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
   
    void onAcknowledgement(RecordMetadata metadata, Exception exception);
    default void open() throws Exception {};

    void close();
}
{code}
 

 

Additionally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding maybeOpen method which throws a checked Exception.  It's called maybeOpen for backwards compatibility purpose as it must  determine whether an interceptor's interface contains the newer open method before calling it accordingly. 


 *NOTE:* Developers are encouraged to throw a more specific exception.
h2. Kafka Consumer Interceptors MaybeOpen Implementation

 
{code:java}
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;

public class ConsumerInterceptors<K, V> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
    private final List<ConsumerInterceptor<K, V>> interceptors;
    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
 
    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptRecords = interceptor.onConsume(interceptRecords);
            } catch (Exception e) {
                log.warn("Error executing interceptor onConsume callback", e);
            }
        }
        return interceptRecords;
    }
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.onCommit(offsets);
            } catch (Exception e) {
                log.warn("Error executing interceptor onCommit callback", e);
            }
        }
    }
    /**
     * Closes every interceptor in a container.
     */
    @Override
    public void close() {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close consumer interceptor ", e);
            }
        }
    }
    public List<ConsumerInterceptor<K, V>> getInterceptors() {
        return interceptors;
    }
    /**
     * Only interceptors which implement {@link ConsumerInterceptor#open()} are called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
            try {
                if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
                    interceptor.open();
                }
             } catch (Exception e) {
                log.error("Failed to open consumer interceptor ", e);
                throw e;
            }
        }
    }
} {code}
 

 
h2. Kafka Producer Interceptors MaybeOpen Implementation

 
{code:java}
package org.apache.kafka.clients.producer.internals;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.util.Arrays;
import java.util.List;

public class ProducerInterceptors<K, V> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
    private final List<ProducerInterceptor<K, V>> interceptors;
    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
   public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
               if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
  
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
 
    public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = extractTopicPartition(record);
                    }
                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, -1, -1), exception);
                }
            } catch (Exception e) {
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
    public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, V> record) {
        return new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
    }
  
    @Override
    public void close() {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.close();
            } catch (Exception e) {
                log.error("Failed to close producer interceptor ", e);
            }
        }
    }
    /**
     * Only interceptors which implement {@link ProducerInterceptor#open()} are called by the container.  This is for backwards
     * compatibility as older interceptors do not contain the default open()
     * */
    public void maybeOpen() throws Exception {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
                    interceptor.open();
                }
            } catch (Exception e) {
                log.error("Failed to open producer interceptor ", e);
                throw e;
            }
        }
    }
}
{code}
 

 

In summary, the overall workflow is that after the configured interceptor instances are returned by the AbstractConfig.getConfiguredInstances(),  the Kafka Consumer/Producer constructor's respective interceptor container maybeOpen method will be called.

 
If in the maybeOpen call, an exception occurs following the interceptor open method call, the respective client constructor's try/catch will call the interceptor container's close method which in-turn loops through and calls each interceptor's close method for clean up of resources allocated in the interceptor open method.  


If an exception occurs in the configure method all objects will be garbage collected as this method must no longer be used for creating threads and/or objects which utilizes threads, connections or other resources which requires clean up.  
h2. Kafka Consumer Constructor With MaybeOpen Method Call
{code:java}
KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
    try {
...
        List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ConsumerInterceptor.class,
                Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
        this.interceptors = new ConsumerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
   ...
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
        // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
        if (this.log != null) {
            close(0, true);
        }
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka consumer", t);
    }
}
{code}
 

 
h2. Kafka Producer Constructor with MaybeOpen Method Call

 
{code:java}
KafkaProducer(ProducerConfig config,
              Serializer<K> keySerializer,
              Serializer<V> valueSerializer,
              ProducerMetadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors<K, V> interceptors,
              Time time) {
    try {
      ...
        List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        this.interceptors.maybeOpen();
  ...
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
        close(Duration.ofMillis(0), true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}
{code}
 


> Add A No Implementation Default Open Method To Consumer and Producer Interceptor Interfaces
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14566
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14566
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Terry Beard
>            Priority: Major
>
> h2. PROBLEM
> The Consumer and Producer interceptor interfaces and their corresponding Kafka Consumer and Producer constructors do not adequately support cleanup of underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the AbstractConfig.getConfiguredInstances()  is delegated responsibility for both creating and configuring each interceptor listed in the interceptor.classes property and returns a configured  *List<ConsumerInterceptor<K,V>>* interceptors.
> h2. Kafka Consumer Constructor
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> Kafka Producer Constructor
> {code}
> h2. Kafka Producer Constructor
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));{code}
>  
> This dual responsibility for both creation and configuration is problematic when it involves multiple interceptors where at least one interceptor's configure method implementation creates and/or depends on objects which creates threads, connections or other resources which requires clean up and the subsequent interceptor's configure method raises a runtime exception.  This raising of the runtime exception results produces a resource leakage in the first interceptor as the interceptor container i.e. ConsumerInterceptors/ProducerInterceptors are never created and therefore the first interceptor's and really any interceptor's close method are never called.
> h2.  Kafka Consumer Constructor
> {code:java}
> try {
> ....
> List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ConsumerInterceptor.class,
>         Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> ....{code}
>  
> If the above line results in a runtime exception, the below {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} is never created. 
>  
> {code:java}
> this.interceptors = new ConsumerInterceptors<>(interceptorList);{code}
>  
> h2. Kafka Producer Constructor
> {code:java}
> try {
> ....
> List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>         ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>         ProducerInterceptor.class,
>         Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
> {code}
>  
> If the above line results in a runtime exception, the below this.interceptors is never created. 
>  
> {code:java}
> if (interceptors != null)
>     this.interceptors = interceptors;
> else
>     this.interceptors = new ProducerInterceptors<>(interceptorList);
> .... {code}
>  
> Although, both Kafka Consumer and Kafka Producer constructors try/catch implement  close for resource clean up, 
>  
> {code:java}
> catch (Throwable t) {
>     // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
>     // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
>     if (this.log != null) {
>         close(0, true);
>     }
>     // now propagate the exception
>     throw new KafkaException("Failed to construct kafka consumer", t);
> } {code}
>  
> their respective close implementation located in the catch above never calls the respective container interceptor close method below as the {*}{color:#ffab00}this{color}{*}.{*}{color:#403294}interceptors{color}{*} was never created.
>  
> {code:java}
> private void close(long timeoutMs, boolean swallowException) {
>  ....  
> Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
>  ....{code}
>  
> This problem is magnified within a webserver cluster i.e. Confluent's REST Proxy server where thousands of requests containing interceptor configuration failures can occur in seconds resulting in an inadvertent DDoS attack as cluster resources are quickly exhausted, disrupting all service activities.
> h2. PROPOSAL
> To help ensure the respective container interceptors are able to invoke their respective interceptor close methods for proper resource clean up, I propose defining a default open method with no implementation and a check exception on the respective Consumer/Producer interceptor interfaces.  This open method will be responsible for creating threads and/or objects which utilizes threads, connections or other resource which requires clean up.  Additionally, the default open method enables implementation optionality as it's empty default behavior means it will do nothing on unimplemented classes of this interceptor interface.  
> h2. Kafka Consumer Interceptor Default Open Implementation
> {code:java}
> package org.apache.kafka.clients.consumer;
> import org.apache.kafka.common.Configurable;
> import org.apache.kafka.common.TopicPartition;
> import java.util.Map;
> public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
>   
>     ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
>   
>     void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
>     default void open() throws Exception {};
>  
>     void close();
> }
> {code}
>  
> h2. Kafka Producer Interceptor Default Open Implementation
> {code:java}
> package org.apache.kafka.clients.producer;
> import org.apache.kafka.common.Configurable;
> public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
>    
>     ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
>    
>     void onAcknowledgement(RecordMetadata metadata, Exception exception);
>     default void open() throws Exception {};
>     void close();
> }
> {code}
>  
> Additionally, the Kafka Consumer/Producer Interceptor containers will implement a corresponding maybeOpen method which throws a checked Exception.  It's called maybeOpen for backwards compatibility purpose as it must  determine whether an interceptor's interface contains the newer open method before calling it accordingly. 
>  *NOTE:* Developers are encouraged to throw a more specific exception.
> h2. Kafka Consumer Interceptors MaybeOpen Implementation
> {code:java}
> package org.apache.kafka.clients.consumer.internals;
> import org.apache.kafka.clients.consumer.ConsumerInterceptor;
> import org.apache.kafka.clients.consumer.ConsumerRecords;
> import org.apache.kafka.clients.consumer.OffsetAndMetadata;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.common.TopicPartition;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.Closeable;
> import java.util.List;
> import java.util.Arrays;
> import java.util.Map;
> public class ConsumerInterceptors<K, V> implements Closeable {
>     private static final Logger log = LoggerFactory.getLogger(ConsumerInterceptors.class);
>     private final List<ConsumerInterceptor<K, V>> interceptors;
>     public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }
>  
>     public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
>         ConsumerRecords<K, V> interceptRecords = records;
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptRecords = interceptor.onConsume(interceptRecords);
>             } catch (Exception e) {
>                 log.warn("Error executing interceptor onConsume callback", e);
>             }
>         }
>         return interceptRecords;
>     }
>     public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.onCommit(offsets);
>             } catch (Exception e) {
>                 log.warn("Error executing interceptor onCommit callback", e);
>             }
>         }
>     }
>     /**
>      * Closes every interceptor in a container.
>      */
>     @Override
>     public void close() {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close consumer interceptor ", e);
>             }
>         }
>     }
>     public List<ConsumerInterceptor<K, V>> getInterceptors() {
>         return interceptors;
>     }
>     /**
>      * Only interceptors which implement {@link ConsumerInterceptor#open()} are called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
>             try {
>                 if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
>                     interceptor.open();
>                 }
>              } catch (Exception e) {
>                 log.error("Failed to open consumer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> } {code}
> h2. Kafka Producer Interceptors MaybeOpen Implementation
> {code:java}
> package org.apache.kafka.clients.producer.internals;
> import org.apache.kafka.clients.producer.ProducerInterceptor;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import org.apache.kafka.clients.producer.RecordMetadata;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.record.RecordBatch;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import java.io.Closeable;
> import java.util.Arrays;
> import java.util.List;
> public class ProducerInterceptors<K, V> implements Closeable {
>     private static final Logger log = LoggerFactory.getLogger(ProducerInterceptors.class);
>     private final List<ProducerInterceptor<K, V>> interceptors;
>     public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
>         this.interceptors = interceptors;
>     }
>    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
>         ProducerRecord<K, V> interceptRecord = record;
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptRecord = interceptor.onSend(interceptRecord);
>             } catch (Exception e) {
>                if (record != null)
>                     log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
>                 else
>                     log.warn("Error executing interceptor onSend callback", e);
>             }
>         }
>         return interceptRecord;
>     }
>   
>     public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.onAcknowledgement(metadata, exception);
>             } catch (Exception e) {
>                 log.warn("Error executing interceptor onAcknowledgement callback", e);
>             }
>         }
>     }
>  
>     public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 if (record == null && interceptTopicPartition == null) {
>                     interceptor.onAcknowledgement(null, exception);
>                 } else {
>                     if (interceptTopicPartition == null) {
>                         interceptTopicPartition = extractTopicPartition(record);
>                     }
>                     interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
>                                     RecordBatch.NO_TIMESTAMP, -1, -1), exception);
>                 }
>             } catch (Exception e) {
>                 log.warn("Error executing interceptor onAcknowledgement callback", e);
>             }
>         }
>     }
>     public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K, V> record) {
>         return new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
>     }
>   
>     @Override
>     public void close() {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 interceptor.close();
>             } catch (Exception e) {
>                 log.error("Failed to close producer interceptor ", e);
>             }
>         }
>     }
>     /**
>      * Only interceptors which implement {@link ProducerInterceptor#open()} are called by the container.  This is for backwards
>      * compatibility as older interceptors do not contain the default open()
>      * */
>     public void maybeOpen() throws Exception {
>         for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
>             try {
>                 if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method -> method.getName() == "open")){
>                     interceptor.open();
>                 }
>             } catch (Exception e) {
>                 log.error("Failed to open producer interceptor ", e);
>                 throw e;
>             }
>         }
>     }
> }
> {code}
>  
> In summary, the overall workflow is that after the configured interceptor instances are returned by the AbstractConfig.getConfiguredInstances(),  the Kafka Consumer/Producer constructor's respective interceptor container maybeOpen method will be called.
>  
> If in the maybeOpen call, an exception occurs following the interceptor open method call, the respective client constructor's try/catch will call the interceptor container's close method which in-turn loops through and calls each interceptor's close method for clean up of resources allocated in the interceptor open method.  
> If an exception occurs in the configure method all objects will be garbage collected as this method must no longer be used for creating threads and/or objects which utilizes threads, connections or other resources which requires clean up.  
> h2. Kafka Consumer Constructor With MaybeOpen Method Call
> {code:java}
> KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
>     try {
> ...
>         List<ConsumerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>                 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ConsumerInterceptor.class,
>                 Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
>         this.interceptors = new ConsumerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();
>    ...
>     } catch (Throwable t) {
>         // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121
>         // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized.
>         if (this.log != null) {
>             close(0, true);
>         }
>         // now propagate the exception
>         throw new KafkaException("Failed to construct kafka consumer", t);
>     }
> }
> {code}
> h2. Kafka Producer Constructor with MaybeOpen Method Call
> {code:java}
> KafkaProducer(ProducerConfig config,
>               Serializer<K> keySerializer,
>               Serializer<V> valueSerializer,
>               ProducerMetadata metadata,
>               KafkaClient kafkaClient,
>               ProducerInterceptors<K, V> interceptors,
>               Time time) {
>     try {
>       ...
>         List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
>                 ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
>                 ProducerInterceptor.class,
>                 Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
>         if (interceptors != null)
>             this.interceptors = interceptors;
>         else
>             this.interceptors = new ProducerInterceptors<>(interceptorList);
>         this.interceptors.maybeOpen();
>   ...
>     } catch (Throwable t) {
>         // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
>         close(Duration.ofMillis(0), true);
>         // now propagate the exception
>         throw new KafkaException("Failed to construct kafka producer", t);
>     }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)