You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pirow Engelbrecht <pi...@etion.co.za> on 2019/10/25 07:22:35 UTC
Kubernetes and Kafka
Hello,
I am currently working on bringing up as simple as possible Kubernetes cluster with Kafka and Zookeeper for development purposes (using minikube). The cluster needs to expose a Kafka broker to the external networking environment (for interfacing to consumers and producers outside of the cluster).
I've managed to spin up both nginx, zookeeper and kafka following various tutorials on the web. Exposing the nginx as a node port is working well, but exposing the kafka broker in the same way is giving problems. Kafka-topics.sh is refusing to work with the -bootstrap-server argument (pointing to the kafka broker) in either the pod or the host, but does work with the -zookeeper argument pointing it to the zookeeper instance on port 2181 (on both the pod and the host (after exposing zookeeper)).
I've research this issue extensively and have set the Kafka KAFKA_ADVERTISED_PORT and KAFKA_ADVERTISED_HOST_NAME environment variables and can see that advertised.host.name and advertised.port matches from the kafka logs. I've also tried setting all the protocols to plaintext - no change. A lot of other people seem to have the same issue. There appears to be working deployments on GIThub, but is so complicated in terms of many zookeeper instances and many brokers, it is not really helping in solving the issue. Any help would be appreciated.
I am using Kafka 2.3.0 through the wurstmeister/kafka docker image. Using the official zookeeper docker image (3.5.6). All running on minikube v1.4.0 using Hyper-V.
My YML file for Kafka:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka
spec:
replicas: 1
selector:
matchLabels:
app: kafka
serviceName: kafka-broker
template:
metadata:
labels:
app: kafka
spec:
containers:
- name: kafka
image: wurstmeister/kafka
ports:
- containerPort: 9092
env:
- name: BROKER_ID_COMMAND
value: "echo ${HOSTNAME##*-}"
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ADVERTISED_PORT
value: "32000"
- name: KAFKA_ADVERTISED_HOST_NAME
value: "172.17.8.220"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zoo1:2181
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
- name: KAFKA_CREATE_TOPICS
value: "mirror-registry:1:1,test-topic:1:1"
- name: KAFKA_LOG_RETENTION_HOURS
value: "-1"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
---
apiVersion: v1
kind: Service
metadata:
labels:
app: kafka-service
name: kafka-service
spec:
type: NodePort
ports:
- name: kafka-port
port: 9092
nodePort: 32000
targetPort: 9092
selector:
app: kafka
externalIPs:
- 172.17.8.220
Kafka configuration from Kafka container log:
advertised.host.name = 172.17.8.220
advertised.listeners = null
advertised.port = 32000
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 600000
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delegation.token.expiry.check.interval.ms = 3600000
delegation.token.expiry.time.ms = 86400000
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 604800000
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 0
group.max.session.timeout.ms = 1800000
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 2.3-IV1
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
listeners = null
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 86400000
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.max.compaction.lag.ms = 9223372036854775807
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /kafka/kafka-logs-kafka-0
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 60000
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 60000
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.3-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 300000
log.retention.hours = -1
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 60000
max.connections = 2147483647
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 1000012
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 1
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 600000
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 1
offsets.topic.segment.bytes = 104857600
password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
password.encoder.iterations = 4096
password.encoder.key.length = 128
password.encoder.keyfactory.algorithm = null
password.encoder.old.secret = null
password.encoder.secret = null
port = 9092
principal.builder.class = null
producer.purgatory.purge.interval.requests = 1000
queued.max.request.bytes = -1
queued.max.requests = 500
quota.consumer.default = 9223372036854775807
quota.producer.default = 9223372036854775807
quota.window.num = 11
quota.window.size.seconds = 1
replica.fetch.backoff.ms = 1000
replica.fetch.max.bytes = 1048576
replica.fetch.min.bytes = 1
replica.fetch.response.max.bytes = 10485760
replica.fetch.wait.max.ms = 500
replica.high.watermark.checkpoint.interval.ms = 5000
replica.lag.time.max.ms = 10000
replica.socket.receive.buffer.bytes = 65536
replica.socket.timeout.ms = 30000
replication.quota.window.num = 11
replication.quota.window.size.seconds = 1
request.timeout.ms = 30000
reserved.broker.max.id = 1000
sasl.client.callback.handler.class = null
sasl.enabled.mechanisms = [GSSAPI]
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.principal.to.local.rules = [DEFAULT]
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism.inter.broker.protocol = GSSAPI
sasl.server.callback.handler.class = null
security.inter.broker.protocol = PLAINTEXT
socket.receive.buffer.bytes = 102400
socket.request.max.bytes = 104857600
socket.send.buffer.bytes = 102400
ssl.cipher.suites = []
ssl.client.auth = none
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.principal.mapping.rules = [DEFAULT]
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
transaction.max.timeout.ms = 900000
transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
transaction.state.log.load.buffer.size = 5242880
transaction.state.log.min.isr = 1
transaction.state.log.num.partitions = 50
transaction.state.log.replication.factor = 1
transaction.state.log.segment.bytes = 104857600
transactional.id.expiration.ms = 604800000
unclean.leader.election.enable = false
zookeeper.connect = zoo1:2181
zookeeper.connection.timeout.ms = 6000
zookeeper.max.in.flight.requests = 10
zookeeper.session.timeout.ms = 6000
zookeeper.set.acl = false
zookeeper.sync.time.ms = 2000
Zookeeper YML:
apiVersion: v1
kind: ReplicationController
metadata:
name: zookeeper
spec:
replicas: 1
selector:
app: zookeeper
template:
metadata:
labels:
app: zookeeper
spec:
containers:
- name: zookeeper
image: zookeeper
ports:
- containerPort: 2181
- containerPort: 2888
- containerPort: 3888
env:
- name: ZOOKEEPER_ID
value: "1"
- name: ZOOKEEPER_SERVER_1
value: zoo1
---
apiVersion: v1
kind: Service
metadata:
name: zoo1
spec:
type: LoadBalancer
ports:
- name: port-2181
port: 2181
protocol: TCP
- name: port-2888
port: 2888
protocol: TCP
- name: port-3888
port: 3888
protocol: TCP
selector:
app: zookeeper
And finally the Nginx YML that I have been using as a sanity check:
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: nginx
name: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- image: nginx:1.7.9
name: nginx
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
labels:
app: nginx-service
name: nginx-service
spec:
type: NodePort
ports:
- name: nginx-port
port: 80
nodePort: 32080
targetPort: 80
selector:
app: nginx
externalIPs:
- 172.17.8.220
Thanks
Pirow Engelbrecht | Senior Design Engineer
Tel +27 12 678 9740 (ext. 9879) | Cell +27 63 148 3376
76 Regency Drive | Irene | Centurion | 0157<https://goo.gl/maps/v9ZbwjqpPyL2>
[create-transition]<https://etion.co.za/>
Facebook<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | YouTube<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | LinkedIn<https://www.linkedin.com/company/etionltd> | Twitter<https://twitter.com/Etionlimited> | Instagram<https://www.instagram.com/Etionlimited/>