You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Pirow Engelbrecht <pi...@etion.co.za> on 2019/10/25 07:22:35 UTC

Kubernetes and Kafka

Hello,

I am currently working on bringing up as simple as possible Kubernetes cluster with Kafka and Zookeeper for development purposes (using minikube). The cluster needs to expose a Kafka broker to the external networking environment (for interfacing to consumers and producers outside of the cluster).

I've managed to spin up both nginx, zookeeper and kafka following various tutorials on the web. Exposing the nginx as a node port is working well, but exposing the kafka broker in the same way is giving problems. Kafka-topics.sh is refusing to work with the -bootstrap-server argument (pointing to the kafka broker) in either the pod or the host, but does work with the -zookeeper argument pointing it to the zookeeper instance on port 2181 (on both the pod and the host (after exposing zookeeper)).

I've research this issue extensively and have set the Kafka KAFKA_ADVERTISED_PORT and KAFKA_ADVERTISED_HOST_NAME environment variables and can see that advertised.host.name and advertised.port matches from the kafka logs. I've also tried setting all the protocols to plaintext - no change. A lot of other people seem to have the same issue. There appears to be working deployments on GIThub, but is so complicated in terms of many zookeeper instances and many brokers, it is not really helping in solving the issue. Any help would be appreciated.

I am using Kafka 2.3.0 through the wurstmeister/kafka docker image. Using the official zookeeper docker image (3.5.6). All running on minikube v1.4.0 using Hyper-V.

My YML file for Kafka:
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  labels:
    app: kafka
spec:
  replicas: 1
  selector:
    matchLabels:
        app: kafka
  serviceName: kafka-broker
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka
        ports:
        - containerPort: 9092
        env:
        - name: BROKER_ID_COMMAND
          value: "echo ${HOSTNAME##*-}"
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_PORT
          value: "32000"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: "172.17.8.220"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
        - name: KAFKA_CREATE_TOPICS
          value: "mirror-registry:1:1,test-topic:1:1"
        - name: KAFKA_LOG_RETENTION_HOURS
          value: "-1"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: kafka-service
  name: kafka-service
spec:
  type: NodePort
  ports:
  - name: kafka-port
    port: 9092
    nodePort: 32000
    targetPort: 9092
  selector:
    app: kafka
  externalIPs:
    - 172.17.8.220

Kafka configuration from Kafka container log:
        advertised.host.name = 172.17.8.220
        advertised.listeners = null
        advertised.port = 32000
        alter.config.policy.class.name = null
        alter.log.dirs.replication.quota.window.num = 11
        alter.log.dirs.replication.quota.window.size.seconds = 1
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 0
        broker.id.generation.enable = true
        broker.rack = null
        client.quota.callback.class = null
        compression.type = producer
        connection.failed.authentication.delay.ms = 100
        connections.max.idle.ms = 600000
        connections.max.reauth.ms = 0
        control.plane.listener.name = null
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delegation.token.expiry.check.interval.ms = 3600000
        delegation.token.expiry.time.ms = 86400000
        delegation.token.master.key = null
        delegation.token.max.lifetime.ms = 604800000
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 1800000
        group.max.size = 2147483647
        group.min.session.timeout.ms = 6000
        host.name =
        inter.broker.listener.name = null
        inter.broker.protocol.version = 2.3-IV1
        kafka.metrics.polling.interval.secs = 10
        kafka.metrics.reporters = []
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
        listeners = null
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.max.compaction.lag.ms = 9223372036854775807
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /kafka/kafka-logs-kafka-0
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.downconversion.enable = true
        log.message.format.version = 2.3-IV1
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = -1
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections = 2147483647
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        max.incremental.fetch.session.cache.slots = 1000
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.alter.log.dirs.threads = null
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 10080
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
        password.encoder.iterations = 4096
        password.encoder.key.length = 128
        password.encoder.keyfactory.algorithm = null
        password.encoder.old.secret = null
        password.encoder.secret = null
        port = 9092
        principal.builder.class = null
        producer.purgatory.purge.interval.requests = 1000
        queued.max.request.bytes = -1
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.client.callback.handler.class = null
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism.inter.broker.protocol = GSSAPI
        sasl.server.callback.handler.class = null
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = []
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.principal.mapping.rules = [DEFAULT]
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect = zoo1:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.max.in.flight.requests = 10
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000

Zookeeper YML:

apiVersion: v1
kind: ReplicationController
metadata:
  name: zookeeper
spec:
  replicas: 1
  selector:
    app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: zookeeper
        ports:
        - containerPort: 2181
        - containerPort: 2888
        - containerPort: 3888
        env:
        - name: ZOOKEEPER_ID
          value: "1"
        - name: ZOOKEEPER_SERVER_1
          value: zoo1
---
apiVersion: v1
kind: Service
metadata:
  name: zoo1
spec:
  type: LoadBalancer
  ports:
  - name: port-2181
    port: 2181
    protocol: TCP
  - name: port-2888
    port: 2888
    protocol: TCP
  - name: port-3888
    port: 3888
    protocol: TCP
  selector:
    app: zookeeper

And finally the Nginx YML that I have been using as a sanity check:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: nginx
  name: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - image: nginx:1.7.9
        name: nginx
        ports:
        - containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
  labels:
    app: nginx-service
  name: nginx-service
spec:
  type: NodePort
  ports:
  - name: nginx-port
    port: 80
    nodePort: 32080
    targetPort: 80
  selector:
    app: nginx
  externalIPs:
    - 172.17.8.220

Thanks


Pirow Engelbrecht | Senior Design Engineer
Tel +27 12 678 9740 (ext. 9879) | Cell +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157<https://goo.gl/maps/v9ZbwjqpPyL2>


[create-transition]<https://etion.co.za/>


Facebook<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | YouTube<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | LinkedIn<https://www.linkedin.com/company/etionltd> | Twitter<https://twitter.com/Etionlimited> | Instagram<https://www.instagram.com/Etionlimited/>