You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sergey Alaev (JIRA)" <ji...@apache.org> on 2016/05/23 15:17:12 UTC

[jira] [Created] (KAFKA-3746) InvalidReceiveException when connecting to broker over SSL

Sergey Alaev created KAFKA-3746:
-----------------------------------

             Summary: InvalidReceiveException when connecting to broker over SSL
                 Key: KAFKA-3746
                 URL: https://issues.apache.org/jira/browse/KAFKA-3746
             Project: Kafka
          Issue Type: Bug
    Affects Versions: 0.9.0.1
         Environment: 3-node cluster on localhost
            Reporter: Sergey Alaev


When trying to do KafkaConsumer.poll(), server closes connection with InvalidReceiveException. Strangely, it is repoduced only with SSL enabled between consumer and broker. We do not use SSL for inter-broker communication.

Consumer configuration:
{code}
[2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] [INFO]: ConsumerConfig values: 
	metric.reporters = []
	metadata.max.age.ms = 300000
	value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
	group.id = sds
	partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
	reconnect.backoff.ms = 50
	sasl.kerberos.ticket.renew.window.factor = 0.8
	max.partition.fetch.bytes = 1048576
	bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
	retry.backoff.ms = 100
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	ssl.keystore.type = JKS
	ssl.trustmanager.algorithm = PKIX
	enable.auto.commit = false
	ssl.key.password = [hidden]
	fetch.max.wait.ms = 500
	sasl.kerberos.min.time.before.relogin = 60000
	connections.max.idle.ms = 540000
	ssl.truststore.password = [hidden]
	session.timeout.ms = 30000
	metrics.num.samples = 2
	client.id = 
	ssl.endpoint.identification.algorithm = null
	key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
	ssl.protocol = TLS
	check.crcs = true
	request.timeout.ms = 40000
	ssl.provider = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
	heartbeat.interval.ms = 3000
	auto.commit.interval.ms = 1000
	receive.buffer.bytes = 32768
	ssl.cipher.suites = null
	ssl.truststore.type = JKS
	security.protocol = SSL
	ssl.truststore.location = src/main/resources/ssl/kafka.client.truststore.jks
	ssl.keystore.password = [hidden]
	ssl.keymanager.algorithm = SunX509
	metrics.sample.window.ms = 30000
	fetch.min.bytes = 1
	send.buffer.bytes = 131072
	auto.offset.reset = earliest
{code}

Server configuration:
{code}
[2016-05-23 15:04:51,707] INFO KafkaConfig values:
        advertised.host.name = null
        metric.reporters = []
        quota.producer.default = 9223372036854775807
        offsets.topic.num.partitions = 50
        log.flush.interval.messages = 9223372036854775807
        auto.create.topics.enable = true
        controller.socket.timeout.ms = 30000
        log.flush.interval.ms = null
        principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        replica.socket.receive.buffer.bytes = 65536
        min.insync.replicas = 2
        replica.fetch.wait.max.ms = 500
        num.recovery.threads.per.data.dir = 1
        ssl.keystore.type = JKS
        default.replication.factor = 3
        ssl.truststore.password = [hidden]
        log.preallocate = false
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        fetch.purgatory.purge.interval.requests = 1000
        ssl.endpoint.identification.algorithm = null
        replica.socket.timeout.ms = 30000
        message.max.bytes = 1000012
        num.io.threads = 10
        offsets.commit.required.acks = -1
        log.flush.offset.checkpoint.interval.ms = 60000
        delete.topic.enable = true
        quota.window.size.seconds = 1
        ssl.truststore.type = JKS
        offsets.commit.timeout.ms = 5000
        quota.window.num = 11
        zookeeper.connect = 127.0.0.1:2181
        authorizer.class.name =
        num.replica.fetchers = 1
        log.retention.ms = null
        log.roll.jitter.hours = 0
        log.cleaner.enable = true
        offsets.load.buffer.size = 5242880
        log.cleaner.delete.retention.ms = 86400000
        ssl.client.auth = none
        controlled.shutdown.max.retries = 3
        queued.max.requests = 500
        offsets.topic.replication.factor = 3
        log.cleaner.threads = 1
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        socket.request.max.bytes = 104857600
        ssl.trustmanager.algorithm = PKIX
        zookeeper.session.timeout.ms = 6000
        log.retention.bytes = -1
        sasl.kerberos.min.time.before.relogin = 60000
        zookeeper.set.acl = false
        connections.max.idle.ms = 600000
        offsets.retention.minutes = 1440
        replica.fetch.backoff.ms = 1000
        inter.broker.protocol.version = 0.9.0.X
        log.retention.hours = 24
        num.partitions = 1
        broker.id.generation.enable = true
        listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        log.roll.ms = null
        log.flush.scheduler.interval.ms = 9223372036854775807
        ssl.cipher.suites = null
        log.index.size.max.bytes = 10485760
        ssl.keymanager.algorithm = SunX509
        security.inter.broker.protocol = PLAINTEXT
        replica.fetch.max.bytes = 1048576
        advertised.port = null
        log.cleaner.dedupe.buffer.size = 134217728
        replica.high.watermark.checkpoint.interval.ms = 5000
        log.cleaner.io.buffer.size = 524288
        sasl.kerberos.ticket.renew.window.factor = 0.8
        zookeeper.connection.timeout.ms = 60000
        controlled.shutdown.retry.backoff.ms = 5000
        log.roll.hours = 168
        log.cleanup.policy = delete
        host.name =
        log.roll.jitter.ms = null
        max.connections.per.ip = 2147483647
        offsets.topic.segment.bytes = 104857600
        background.threads = 10
        quota.consumer.default = 9223372036854775807
        request.timeout.ms = 30000
        log.index.interval.bytes = 4096
        log.dir = /tmp/kafka-logs
        log.segment.bytes = 1073741824
        log.cleaner.backoff.ms = 15000
        offset.metadata.max.bytes = 4096
        ssl.truststore.location = /ssl/server.truststore.jks
        group.max.session.timeout.ms = 30000
        ssl.keystore.password = [hidden]
        zookeeper.sync.time.ms = 2000
        port = 9092
        log.retention.minutes = null
        log.segment.delete.delay.ms = 60000
        log.dirs = /data
        controlled.shutdown.enable = true
        compression.type = producer
        max.connections.per.ip.overrides =
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        auto.leader.rebalance.enable = true
        leader.imbalance.check.interval.seconds = 300
        log.cleaner.min.cleanable.ratio = 0.5
        replica.lag.time.max.ms = 10000
        num.network.threads = 5
        ssl.key.password = [hidden]
        reserved.broker.max.id = 1000
        metrics.num.samples = 2
        socket.send.buffer.bytes = 102400
        ssl.protocol = TLS
        socket.receive.buffer.bytes = 102400
        ssl.keystore.location = /ssl/server.keystore.jks
        replica.fetch.min.bytes = 1
        unclean.leader.election.enable = true
        group.min.session.timeout.ms = 6000
        log.cleaner.io.buffer.load.factor = 0.9
        offsets.retention.check.interval.ms = 600000
        producer.purgatory.purge.interval.requests = 1000
        metrics.sample.window.ms = 30000
        broker.id = 1
        offsets.topic.compression.codec = 0
        log.retention.check.interval.ms = 300000
        advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
        leader.imbalance.per.broker.percentage = 10
 (kafka.server.KafkaConfig)
{code}

Client:
{code}
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:161)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45)
	at org.apache.kafka.common.network.Selector.close(Selector.java:442)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:310)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
{code}

Server:
{code}
[2016-05-23 15:07:16,427] WARN Unexpected error from /127.0.0.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
        at kafka.network.Processor.run(SocketServer.scala:413)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)