You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by hezekiah maina <he...@gmail.com> on 2020/10/07 13:55:54 UTC
Statefun + Confluent Fully-managed Kafka
Hi,
I'm trying to use Stateful Functions with Kafka as my ingress and egress.
I'm using the Confluent fully-managed Kafka and I'm having a challenge
adding my authentication details in the module.yaml file.
Here is my current config details:
version: "1.0"
module:
meta:
type: remote
spec:
functions:
- function:
meta:
kind: http
type: example/greeter
spec:
endpoint: <https-endpoint>
states:
- seen_count
maxNumBatchRequests: 500
timeout: 2min
ingresses:
- ingress:
meta:
type: statefun.kafka.io/routable-protobuf-ingress
id: example/names
spec:
address: <confluent-bootstrap-server>
consumerGroupId: statefun-consumer-group
topics:
- topic: names
typeUrl: com.googleapis/example.GreetRequest
targets:
- example/greeter
properties:
- bootstrap.servers:<confluent-bootstrap-server>
- security.protocol: SASL_SSL
- sasl.mechanism: PLAIN
- sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
- ssl.endpoint.identification.algorithm: https
egresses:
- egress:
meta:
type: statefun.kafka.io/generic-egress
id: example/greets
spec:
address: <confluent-bootstrap-server>
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
properties:
- bootstrap.servers: <confluent-bootstrap-server>
- security.protocol: SASL_SSL
- sasl.mechanisms: PLAIN
- sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
- ssl.endpoint.identification.algorithm: https
After running docker-compose with a master and worker containers I'm
getting this error:
Could not find a 'KafkaClient' entry in the JAAS configuration. System
property 'java.security.auth.login.config' is
/tmp/jaas-2846080966990890307.conf
The producer config logged :
worker_1 | 2020-10-07 13:38:08,489 INFO
org.apache.kafka.clients.producer.ProducerConfig -
ProducerConfig values:
worker_1 | acks = 1
worker_1 | batch.size = 16384
worker_1 | bootstrap.servers = [https://
---.asia-southeast1.gcp.confluent.cloud:9092]
worker_1 | buffer.memory = 33554432
worker_1 | client.dns.lookup = default
worker_1 | client.id =
worker_1 | compression.type = none
worker_1 | connections.max.idle.ms = 540000
worker_1 | delivery.timeout.ms = 120000
worker_1 | enable.idempotence = false
worker_1 | interceptor.classes = []
worker_1 | key.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
worker_1 | linger.ms = 0
worker_1 | max.block.ms = 60000
worker_1 | max.in.flight.requests.per.connection = 5
worker_1 | max.request.size = 1048576
worker_1 | metadata.max.age.ms = 300000
worker_1 | metric.reporters = []
worker_1 | metrics.num.samples = 2
worker_1 | metrics.recording.level = INFO
worker_1 | metrics.sample.window.ms = 30000
worker_1 | partitioner.class = class
org.apache.kafka.clients.producer.internals.DefaultPartitioner
worker_1 | receive.buffer.bytes = 32768
worker_1 | reconnect.backoff.max.ms = 1000
worker_1 | reconnect.backoff.ms = 50
worker_1 | request.timeout.ms = 30000
worker_1 | retries = 2147483647
worker_1 | retry.backoff.ms = 100
worker_1 | sasl.client.callback.handler.class = null
worker_1 | sasl.jaas.config = null
worker_1 | sasl.kerberos.kinit.cmd = /usr/bin/kinit
worker_1 | sasl.kerberos.min.time.before.relogin = 60000
worker_1 | sasl.kerberos.service.name = null
worker_1 | sasl.kerberos.ticket.renew.jitter = 0.05
worker_1 | sasl.kerberos.ticket.renew.window.factor = 0.8
worker_1 | sasl.login.callback.handler.class = null
worker_1 | sasl.login.class = null
worker_1 | sasl.login.refresh.buffer.seconds = 300
worker_1 | sasl.login.refresh.min.period.seconds = 60
worker_1 | sasl.login.refresh.window.factor = 0.8
worker_1 | sasl.login.refresh.window.jitter = 0.05
worker_1 | sasl.mechanism = GSSAPI
worker_1 | security.protocol = SASL_SSL
worker_1 | send.buffer.bytes = 131072
worker_1 | ssl.cipher.suites = null
worker_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
worker_1 | ssl.endpoint.identification.algorithm = https
worker_1 | ssl.key.password = null
worker_1 | ssl.keymanager.algorithm = SunX509
worker_1 | ssl.keystore.location = null
worker_1 | ssl.keystore.password = null
worker_1 | ssl.keystore.type = JKS
worker_1 | ssl.protocol = TLS
worker_1 | ssl.provider = null
worker_1 | ssl.secure.random.implementation = null
worker_1 | ssl.trustmanager.algorithm = PKIX
worker_1 | ssl.truststore.location = null
worker_1 | ssl.truststore.password = null
worker_1 | ssl.truststore.type = JKS
worker_1 | transaction.timeout.ms = 100000
worker_1 | transactional.id = null
worker_1 | value.serializer = class
org.apache.kafka.common.serialization.ByteArraySerializer
worker_1 |
Is there something that I'm missing?
Re: Statefun + Confluent Fully-managed Kafka
Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Hezekiah,
I've confirmed that the Kafka properties set in the module specification
file (module.yaml) are indeed correctly being parsed and used to construct
the internal Kafka clients.
StateFun / Flink does not alter or modify the properties.
So, this should be something wrong with your property settings, and causing
the Kafka client itself to not pick up the `sasl.jaas.config` property
value.
From the resolved producer config in the logs, it looks like your
`sasl.jaas.config` is null, but all other properties are being picked up
correctly.
Please check your properties again, and make sure their keys are correct
and values conform to the JAAS config formats.
For starters, there's a typo in your `sasl.mechanism` config, you've
mis-typed an extra 's'.
I've verified that the following properties will work, with SASL JAAS
config being picked up correctly:
```
egresses:
- egress:
meta:
type: statefun.kafka.io/generic-egress
id: example/greets
spec:
address: <confluent-bootstrap-server>
deliverySemantic:
type: exactly-once
transactionTimeoutMillis: 100000
properties:
- security.protocol: SASL_SSL
- sasl.mechanism: PLAIN
- sasl.jaas.config:
org.apache.kafka.common.security.plain.PlainLoginModule required
username="USERNAME" password="PASSWORD";
- ssl.endpoint.identification.algorithm: https
```
Cheers,
Gordon
On Wed, Oct 7, 2020 at 11:36 PM Till Rohrmann <tr...@apache.org> wrote:
> Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal
> in who might be able to help you with this problem.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 3:56 PM hezekiah maina <he...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm trying to use Stateful Functions with Kafka as my ingress and egress.
>> I'm using the Confluent fully-managed Kafka and I'm having a challenge
>> adding my authentication details in the module.yaml file.
>> Here is my current config details:
>> version: "1.0"
>> module:
>> meta:
>> type: remote
>> spec:
>> functions:
>> - function:
>> meta:
>> kind: http
>> type: example/greeter
>> spec:
>> endpoint: <https-endpoint>
>> states:
>> - seen_count
>> maxNumBatchRequests: 500
>> timeout: 2min
>> ingresses:
>> - ingress:
>> meta:
>> type: statefun.kafka.io/routable-protobuf-ingress
>> id: example/names
>> spec:
>> address: <confluent-bootstrap-server>
>> consumerGroupId: statefun-consumer-group
>> topics:
>> - topic: names
>> typeUrl: com.googleapis/example.GreetRequest
>> targets:
>> - example/greeter
>> properties:
>> - bootstrap.servers:<confluent-bootstrap-server>
>> - security.protocol: SASL_SSL
>> - sasl.mechanism: PLAIN
>> - sasl.jaas.config:
>> org.apache.kafka.common.security.plain.PlainLoginModule required
>> username="USERNAME" password="PASSWORD";
>> - ssl.endpoint.identification.algorithm: https
>> egresses:
>> - egress:
>> meta:
>> type: statefun.kafka.io/generic-egress
>> id: example/greets
>> spec:
>> address: <confluent-bootstrap-server>
>> deliverySemantic:
>> type: exactly-once
>> transactionTimeoutMillis: 100000
>> properties:
>> - bootstrap.servers: <confluent-bootstrap-server>
>> - security.protocol: SASL_SSL
>> - sasl.mechanisms: PLAIN
>> - sasl.jaas.config:
>> org.apache.kafka.common.security.plain.PlainLoginModule required
>> username="USERNAME" password="PASSWORD";
>> - ssl.endpoint.identification.algorithm: https
>>
>> After running docker-compose with a master and worker containers I'm
>> getting this error:
>> Could not find a 'KafkaClient' entry in the JAAS configuration. System
>> property 'java.security.auth.login.config' is
>> /tmp/jaas-2846080966990890307.conf
>>
>> The producer config logged :
>> worker_1 | 2020-10-07 13:38:08,489 INFO
>> org.apache.kafka.clients.producer.ProducerConfig -
>> ProducerConfig values:
>> worker_1 | acks = 1
>> worker_1 | batch.size = 16384
>> worker_1 | bootstrap.servers = [https://
>> ---.asia-southeast1.gcp.confluent.cloud:9092]
>> worker_1 | buffer.memory = 33554432
>> worker_1 | client.dns.lookup = default
>> worker_1 | client.id =
>> worker_1 | compression.type = none
>> worker_1 | connections.max.idle.ms = 540000
>> worker_1 | delivery.timeout.ms = 120000
>> worker_1 | enable.idempotence = false
>> worker_1 | interceptor.classes = []
>> worker_1 | key.serializer = class
>> org.apache.kafka.common.serialization.ByteArraySerializer
>> worker_1 | linger.ms = 0
>> worker_1 | max.block.ms = 60000
>> worker_1 | max.in.flight.requests.per.connection = 5
>> worker_1 | max.request.size = 1048576
>> worker_1 | metadata.max.age.ms = 300000
>> worker_1 | metric.reporters = []
>> worker_1 | metrics.num.samples = 2
>> worker_1 | metrics.recording.level = INFO
>> worker_1 | metrics.sample.window.ms = 30000
>> worker_1 | partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>> worker_1 | receive.buffer.bytes = 32768
>> worker_1 | reconnect.backoff.max.ms = 1000
>> worker_1 | reconnect.backoff.ms = 50
>> worker_1 | request.timeout.ms = 30000
>> worker_1 | retries = 2147483647
>> worker_1 | retry.backoff.ms = 100
>> worker_1 | sasl.client.callback.handler.class = null
>> worker_1 | sasl.jaas.config = null
>> worker_1 | sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> worker_1 | sasl.kerberos.min.time.before.relogin = 60000
>> worker_1 | sasl.kerberos.service.name = null
>> worker_1 | sasl.kerberos.ticket.renew.jitter = 0.05
>> worker_1 | sasl.kerberos.ticket.renew.window.factor = 0.8
>> worker_1 | sasl.login.callback.handler.class = null
>> worker_1 | sasl.login.class = null
>> worker_1 | sasl.login.refresh.buffer.seconds = 300
>> worker_1 | sasl.login.refresh.min.period.seconds = 60
>> worker_1 | sasl.login.refresh.window.factor = 0.8
>> worker_1 | sasl.login.refresh.window.jitter = 0.05
>> worker_1 | sasl.mechanism = GSSAPI
>> worker_1 | security.protocol = SASL_SSL
>> worker_1 | send.buffer.bytes = 131072
>> worker_1 | ssl.cipher.suites = null
>> worker_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> worker_1 | ssl.endpoint.identification.algorithm = https
>> worker_1 | ssl.key.password = null
>> worker_1 | ssl.keymanager.algorithm = SunX509
>> worker_1 | ssl.keystore.location = null
>> worker_1 | ssl.keystore.password = null
>> worker_1 | ssl.keystore.type = JKS
>> worker_1 | ssl.protocol = TLS
>> worker_1 | ssl.provider = null
>> worker_1 | ssl.secure.random.implementation = null
>> worker_1 | ssl.trustmanager.algorithm = PKIX
>> worker_1 | ssl.truststore.location = null
>> worker_1 | ssl.truststore.password = null
>> worker_1 | ssl.truststore.type = JKS
>> worker_1 | transaction.timeout.ms = 100000
>> worker_1 | transactional.id = null
>> worker_1 | value.serializer = class
>> org.apache.kafka.common.serialization.ByteArraySerializer
>> worker_1 |
>>
>> Is there something that I'm missing?
>>
>
Re: Statefun + Confluent Fully-managed Kafka
Posted by Till Rohrmann <tr...@apache.org>.
Hi Hezekiah, thanks for reporting this issue. I am pulling Gordon and Igal
in who might be able to help you with this problem.
Cheers,
Till
On Wed, Oct 7, 2020 at 3:56 PM hezekiah maina <he...@gmail.com>
wrote:
> Hi,
>
> I'm trying to use Stateful Functions with Kafka as my ingress and egress.
> I'm using the Confluent fully-managed Kafka and I'm having a challenge
> adding my authentication details in the module.yaml file.
> Here is my current config details:
> version: "1.0"
> module:
> meta:
> type: remote
> spec:
> functions:
> - function:
> meta:
> kind: http
> type: example/greeter
> spec:
> endpoint: <https-endpoint>
> states:
> - seen_count
> maxNumBatchRequests: 500
> timeout: 2min
> ingresses:
> - ingress:
> meta:
> type: statefun.kafka.io/routable-protobuf-ingress
> id: example/names
> spec:
> address: <confluent-bootstrap-server>
> consumerGroupId: statefun-consumer-group
> topics:
> - topic: names
> typeUrl: com.googleapis/example.GreetRequest
> targets:
> - example/greeter
> properties:
> - bootstrap.servers:<confluent-bootstrap-server>
> - security.protocol: SASL_SSL
> - sasl.mechanism: PLAIN
> - sasl.jaas.config:
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="USERNAME" password="PASSWORD";
> - ssl.endpoint.identification.algorithm: https
> egresses:
> - egress:
> meta:
> type: statefun.kafka.io/generic-egress
> id: example/greets
> spec:
> address: <confluent-bootstrap-server>
> deliverySemantic:
> type: exactly-once
> transactionTimeoutMillis: 100000
> properties:
> - bootstrap.servers: <confluent-bootstrap-server>
> - security.protocol: SASL_SSL
> - sasl.mechanisms: PLAIN
> - sasl.jaas.config:
> org.apache.kafka.common.security.plain.PlainLoginModule required
> username="USERNAME" password="PASSWORD";
> - ssl.endpoint.identification.algorithm: https
>
> After running docker-compose with a master and worker containers I'm
> getting this error:
> Could not find a 'KafkaClient' entry in the JAAS configuration. System
> property 'java.security.auth.login.config' is
> /tmp/jaas-2846080966990890307.conf
>
> The producer config logged :
> worker_1 | 2020-10-07 13:38:08,489 INFO
> org.apache.kafka.clients.producer.ProducerConfig -
> ProducerConfig values:
> worker_1 | acks = 1
> worker_1 | batch.size = 16384
> worker_1 | bootstrap.servers = [https://
> ---.asia-southeast1.gcp.confluent.cloud:9092]
> worker_1 | buffer.memory = 33554432
> worker_1 | client.dns.lookup = default
> worker_1 | client.id =
> worker_1 | compression.type = none
> worker_1 | connections.max.idle.ms = 540000
> worker_1 | delivery.timeout.ms = 120000
> worker_1 | enable.idempotence = false
> worker_1 | interceptor.classes = []
> worker_1 | key.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> worker_1 | linger.ms = 0
> worker_1 | max.block.ms = 60000
> worker_1 | max.in.flight.requests.per.connection = 5
> worker_1 | max.request.size = 1048576
> worker_1 | metadata.max.age.ms = 300000
> worker_1 | metric.reporters = []
> worker_1 | metrics.num.samples = 2
> worker_1 | metrics.recording.level = INFO
> worker_1 | metrics.sample.window.ms = 30000
> worker_1 | partitioner.class = class
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> worker_1 | receive.buffer.bytes = 32768
> worker_1 | reconnect.backoff.max.ms = 1000
> worker_1 | reconnect.backoff.ms = 50
> worker_1 | request.timeout.ms = 30000
> worker_1 | retries = 2147483647
> worker_1 | retry.backoff.ms = 100
> worker_1 | sasl.client.callback.handler.class = null
> worker_1 | sasl.jaas.config = null
> worker_1 | sasl.kerberos.kinit.cmd = /usr/bin/kinit
> worker_1 | sasl.kerberos.min.time.before.relogin = 60000
> worker_1 | sasl.kerberos.service.name = null
> worker_1 | sasl.kerberos.ticket.renew.jitter = 0.05
> worker_1 | sasl.kerberos.ticket.renew.window.factor = 0.8
> worker_1 | sasl.login.callback.handler.class = null
> worker_1 | sasl.login.class = null
> worker_1 | sasl.login.refresh.buffer.seconds = 300
> worker_1 | sasl.login.refresh.min.period.seconds = 60
> worker_1 | sasl.login.refresh.window.factor = 0.8
> worker_1 | sasl.login.refresh.window.jitter = 0.05
> worker_1 | sasl.mechanism = GSSAPI
> worker_1 | security.protocol = SASL_SSL
> worker_1 | send.buffer.bytes = 131072
> worker_1 | ssl.cipher.suites = null
> worker_1 | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> worker_1 | ssl.endpoint.identification.algorithm = https
> worker_1 | ssl.key.password = null
> worker_1 | ssl.keymanager.algorithm = SunX509
> worker_1 | ssl.keystore.location = null
> worker_1 | ssl.keystore.password = null
> worker_1 | ssl.keystore.type = JKS
> worker_1 | ssl.protocol = TLS
> worker_1 | ssl.provider = null
> worker_1 | ssl.secure.random.implementation = null
> worker_1 | ssl.trustmanager.algorithm = PKIX
> worker_1 | ssl.truststore.location = null
> worker_1 | ssl.truststore.password = null
> worker_1 | ssl.truststore.type = JKS
> worker_1 | transaction.timeout.ms = 100000
> worker_1 | transactional.id = null
> worker_1 | value.serializer = class
> org.apache.kafka.common.serialization.ByteArraySerializer
> worker_1 |
>
> Is there something that I'm missing?
>