You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Huy Nguyen <vi...@gmail.com> on 2022/11/22 07:59:01 UTC

Can not start python kafka consumer using SASL/SCRAM

I'm having an issue with Kafka. Whenever the consumer use SASL_SSL or
SASL_PLAINTEXT, it can't start. I've tried to change the consumer to use
PLAINTEXT or SSL (without SASL), and its working fine

Below are my configurations and error:


*config/server.properties:*
listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093,SASL_SSL://localhost:9094,SASL_PLAINTEXT://localhost:9095


ssl.keystore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/KeyStore.jks
ssl.keystore.password=123456
ssl.key.password=123456
ssl.truststore.location=/root/zoo-kaf/kafka_2.12-3.2.3/ssl/truststore.jks
ssl.truststore.password=123456
sasl.enabled.mechanisms=SCRAM-SHA-512,PLAIN
listener.name.sasl_ssl.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
listener.name.sasl_plaintext.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
required \

   username="alice" \
   password="alice-secret" \
   user_alice="alice-secret";
...


*consumer.py*
Also, does username alice can be used for `sasl_plain_username`?

from kafka import KafkaConsumer
import logging

logging.basicConfig(level=logging.DEBUG)
# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('test-topic', bootstrap_servers='127.0.0.1:9094',
security_protocol="SASL_SSL", sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='alice', sasl_plain_password='alice-secret',
ssl_check_hostname=False, ssl_cafile="/etc/ssl/certs/ca-cert")

Consumer log:

DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<disconnected> [IPv4 ('127.0.0.1', 9094)]>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<disconnected> [IPv4 ('127.0.0.1', 9094)]>: setting socket option (6, 1, 1)
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<connecting> [IPv4 ('127.0.0.1', 9094)]>: connecting to 127.0.0.1:9094
[('127.0.0.1', 9094) IPv4]
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<connecting> [IPv4 ('127.0.0.1', 9094)]>: established TCP connection
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<connecting> [IPv4 ('127.0.0.1', 9094)]>: initiating SSL handshake
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<handshake> [IPv4 ('127.0.0.1', 9094)]>: wrapping socket in ssl context
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<handshake> [IPv4 ('127.0.0.1', 9094)]>: completed SSL handshake.
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<handshake> [IPv4 ('127.0.0.1', 9094)]>: initiating SASL authentication
DEBUG:kafka.protocol.parser:Sending request
SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<authenticating> [IPv4 ('127.0.0.1', 9094)]> Request 1:
SaslHandShakeRequest_v0(mechanism='SCRAM-SHA-512')
DEBUG:kafka.protocol.parser:Received correlation id: 1
DEBUG:kafka.protocol.parser:Processing response SaslHandShakeResponse_v0
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<authenticating> [IPv4 ('127.0.0.1', 9094)]> Response 1 (0.7309913635253906
ms): SaslHandShakeResponse_v0(error_code=0,
enabled_mechanisms=['SCRAM-SHA-512', 'PLAIN'])
ERROR:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<authenticating> [IPv4 ('127.0.0.1', 9094)]>: Error receiving reply from
server
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 692, in
_try_authenticate_scram
    (data_len,) = struct.unpack('>i', self._recv_bytes_blocking(4))
  File "/usr/local/lib/python3.6/site-packages/kafka/conn.py", line 616, in
_recv_bytes_blocking
    raise ConnectionError('Connection reset during recv')
ConnectionError: Connection reset during recv
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<authenticating> [IPv4 ('127.0.0.1', 9094)]>: Closing connection.
KafkaConnectionError: <BrokerConnection node_id=bootstrap-0 host=
127.0.0.1:9094 <authenticating> [IPv4 ('127.0.0.1', 9094)]>: Connection
reset during recv
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=127.0.0.1:9094
<authenticating> [IPv4 ('127.0.0.1', 9094)]>: reconnect backoff
0.055656264781808004 after 1 failures
Traceback (most recent call last):
  File "consumer-scram-ssl.py", line 6, in <module>
    consumer = KafkaConsumer('test-topic', bootstrap_servers='127.0.0.1:9094',
security_protocol="SASL_SSL", sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='alice', sasl_plain_password='alice-secret',
ssl_check_hostname=False, ssl_cafile="/etc/ssl/certs/ca-cert")

  File "/usr/local/lib/python3.6/site-packages/kafka/consumer/group.py",
line 356, in __init__
    self._client = KafkaClient(metrics=self._metrics, **self.config)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line
244, in __init__
    self.config['api_version'] = self.check_version(timeout=check_timeout)
  File "/usr/local/lib/python3.6/site-packages/kafka/client_async.py", line
900, in check_version
    raise Errors.NoBrokersAvailable()

Broker doesn't return any log