You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/22 15:05:06 UTC

[GitHub] [pulsar] brigen opened a new issue, #16181: Unauthorized; error code: 401 when using pulsar-io-kafka connector with schema-registry that requires auth

brigen opened a new issue, #16181:
URL: https://github.com/apache/pulsar/issues/16181

   Got com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
   when trying to use kafka-source connector with schema registry that requires basic auth
   
   #### Steps to reproduce
   Start the connector that connects to a kafka-broker, but with KafkaAvroDeserializer which goes to schema-registry. Put a basic auth to the schema-registry
   use className: org.apache.pulsar.io.kafka.KafkaBytesSource
   config file of pulsar-io-kafka connector:
   bootstrapServers: "localhost:9092"
       topic: abcV1
       groupId: "group.V1.consumer-1"
       valueDeserializationClass: io.confluent.kafka.serializers.KafkaAvroDeserializer
       consumerConfigProperties:
         client.id: "avs.sit"
         security.protocol: "SASL_SSL"
         sasl.mechanism: "PLAIN"
         acks: "all"
         client.dns.lookup: use_all_dns_ips
         sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"password\";"
         value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
         basic.auth.credentials.source: USER_INFO
         specific.avro.reader: true
         schema.registry.url: https://your-schmea-registry-url
         basic.auth.user.info: USER:PASSOWRD
   
   
   #### System configuration
   **Pulsar version**: 2.9.2
   
   Already digged the source code and saw that the problem might be here:
   in the KafkaBytesSource.java line 110
   
   private void initSchemaCache(Properties props) {
           KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props);
           List<String> urls = config.getSchemaRegistryUrls();
           int maxSchemaObject = config.getMaxSchemasPerSubject();
           SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
           log.info("initializing SchemaRegistry Client, urls:{}, maxSchemasPerSubject: {}", urls, maxSchemaObject);
           schemaCache = new AvroSchemaCache(schemaRegistryClient);
       }
   
   The cached schema registry is called without properties basically, and it creates a RestService without properties too,
   and the default RestService never calls configure, so even thou we are passing basic.auth.user.info they never get passed to RestService so the call towards schema-registry is made without an Authorization header


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on issue #16181: Unauthorized; error code: 401 when using pulsar-io-kafka connector with schema-registry that requires auth

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #16181:
URL: https://github.com/apache/pulsar/issues/16181#issuecomment-1193039166

   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org