You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sergey Lemekhov (Jira)" <ji...@apache.org> on 2021/12/08 11:02:00 UTC

[jira] [Created] (KAFKA-13519) Same JAAS configuration used for all producers

Sergey Lemekhov created KAFKA-13519:
---------------------------------------

             Summary: Same JAAS configuration used for all producers
                 Key: KAFKA-13519
                 URL: https://issues.apache.org/jira/browse/KAFKA-13519
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 3.0.0
            Reporter: Sergey Lemekhov
         Attachments: AzureAuthCallbackHandler.java, OAuthBearerTokenImpl.java

h3. Problem

Sending messages to more than one Kafka cluster is impossible when using instances of {{org.apache.kafka.clients.producer.KafkaProducer}} from {{kafka-clients}} Java library with {{SASL_JAAS_CONFIG}} authentication configured.
Only one {{org.apache.kafka.common.security.authenticator.LoginManager}} is created for all of the clusters ({{{}org.apache.kafka.common.security.authenticator.LoginManager#DYNAMIC_INSTANCES{}}} map contains only one entry).
h3. How to reproduce

Create two {{KafkaProducer}} instances with the following configuration (producers should use different kafka clusters and have different {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting):
{code:java}
Properties properties = new Properties();
properties.put("security.protocol", "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
properties.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, AzureAuthCallbackHandler.class); //custom class for handling callbacks. in my case it is Azure Event Hubs with Kafka API support
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;");

//here custom configuration is set for callback handler settings
properties.put(AzureAuthCallbackHandler.AUTHORITY_CONFIG, "https://login.microsoftonline.com/" + tenantId); //azure tenant id
properties.put(AzureAuthCallbackHandler.APP_ID_CONFIG, appId); //azure oauth 2.0 app id
properties.put(AzureAuthCallbackHandler.APP_SECRET_CONFIG, appSecret); //azure oauth 2.0 app secret
{code}
Here {{AzureAuthCallbackHandler}} is a custom class which takes care of acquiring tokens from Azure. It is configured to fetch tokens from the same host that used as {{ProducerConfig.BOOTSTRAP_SERVERS_CONFIG}} setting and class instance should be different for each created {{{}KafkaProducer{}}}. However it is created only once by the client library (for the only {{{}LoginManager{}}}) and used for all producers.

When using both producers for sending messages this leads to a lot of:
{code:java}
[Producer clientId=my-client-id] Error while fetching metadata with correlation id 164 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION}
{code}
and finally to:
{code:java}
org.apache.kafka.common.errors.TimeoutException: Topic my-topic not present in metadata after 60000 ms.
{code}
The second producer tries to fetch metadata from the cluster configured for the first producer and can't find target topic there.
h3. Workaround

Add a unique jaas config option for each Kafka cluster:
{code:java}
String options = "cluster=" + clusterName; //clusterName should be unique for each created KafkaProducer instance
properties.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " + options + ";");
{code}
LoginManagers map uses {{SASL_JAAS_CONFIG}} string as a part of the key so adding a meaningless option to the string makes client library create different {{LoginManager}} for each {{KafkaProducer}} instance and the problem disappears.
This is done in {{org.apache.kafka.common.security.authenticator.LoginManager#acquireLoginManager}} method: a {{LoginMetadata}} instance is created and configured with {{SASL_JAAS_CONFIG}} value and used as the key in {{LoginManager.DYNAMIC_INSTANCES}} map.
h3. Suggested solution

Each {{KafkaProducer}} instance should have individual isolated authentication handling objects linked to it regardless of their similarities in configuration. The {{SASL_LOGIN_CALLBACK_HANDLER_CLASS}} class should be instantiated for each producer individually (since its {{org.apache.kafka.common.security.auth.AuthenticateCallbackHandler#configure}} method is invoked with producer's configuration which could be different from one producer to another).

h3. Additional details
I've attached callback handler and token implementation for reference.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)