You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/22 15:05:06 UTC
[GitHub] [pulsar] brigen opened a new issue, #16181: Unauthorized; error code: 401 when using pulsar-io-kafka connector with schema-registry that requires auth
brigen opened a new issue, #16181:
URL: https://github.com/apache/pulsar/issues/16181
Got com.google.common.util.concurrent.UncheckedExecutionException: java.lang.RuntimeException: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
when trying to use kafka-source connector with schema registry that requires basic auth
#### Steps to reproduce
Start the connector that connects to a kafka-broker, but with KafkaAvroDeserializer which goes to schema-registry. Put a basic auth to the schema-registry
use className: org.apache.pulsar.io.kafka.KafkaBytesSource
config file of pulsar-io-kafka connector:
bootstrapServers: "localhost:9092"
topic: abcV1
groupId: "group.V1.consumer-1"
valueDeserializationClass: io.confluent.kafka.serializers.KafkaAvroDeserializer
consumerConfigProperties:
client.id: "avs.sit"
security.protocol: "SASL_SSL"
sasl.mechanism: "PLAIN"
acks: "all"
client.dns.lookup: use_all_dns_ips
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"password\";"
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
basic.auth.credentials.source: USER_INFO
specific.avro.reader: true
schema.registry.url: https://your-schmea-registry-url
basic.auth.user.info: USER:PASSOWRD
#### System configuration
**Pulsar version**: 2.9.2
Already digged the source code and saw that the problem might be here:
in the KafkaBytesSource.java line 110
private void initSchemaCache(Properties props) {
KafkaAvroDeserializerConfig config = new KafkaAvroDeserializerConfig(props);
List<String> urls = config.getSchemaRegistryUrls();
int maxSchemaObject = config.getMaxSchemasPerSubject();
SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(urls, maxSchemaObject);
log.info("initializing SchemaRegistry Client, urls:{}, maxSchemasPerSubject: {}", urls, maxSchemaObject);
schemaCache = new AvroSchemaCache(schemaRegistryClient);
}
The cached schema registry is called without properties basically, and it creates a RestService without properties too,
and the default RestService never calls configure, so even thou we are passing basic.auth.user.info they never get passed to RestService so the call towards schema-registry is made without an Authorization header
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] github-actions[bot] commented on issue #16181: Unauthorized; error code: 401 when using pulsar-io-kafka connector with schema-registry that requires auth
Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on issue #16181:
URL: https://github.com/apache/pulsar/issues/16181#issuecomment-1193039166
The issue had no activity for 30 days, mark with Stale label.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org