You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2016/05/23 15:26:12 UTC
[jira] [Commented] (KAFKA-3746) InvalidReceiveException when
connecting to broker over SSL
[ https://issues.apache.org/jira/browse/KAFKA-3746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15296514#comment-15296514 ]
Ismael Juma commented on KAFKA-3746:
------------------------------------
In your server configuration you have:
listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
In your consumer configuration, you have:
bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
security.protocol = SSL
So, your consumer is connecting to port 9092 with security protocol SSL even though the broker expects that port to be PLAINTEXT. That is likely to be the reason for the error.
> InvalidReceiveException when connecting to broker over SSL
> ----------------------------------------------------------
>
> Key: KAFKA-3746
> URL: https://issues.apache.org/jira/browse/KAFKA-3746
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.9.0.1
> Environment: 3-node cluster on localhost
> Reporter: Sergey Alaev
>
> When trying to do KafkaConsumer.poll(), server closes connection with InvalidReceiveException. Strangely, it is repoduced only with SSL enabled between consumer and broker. We do not use SSL for inter-broker communication.
> Consumer configuration:
> {code}
> [2016-05-23T15:07:14.806Z] [] [kafka-thread] [ConsumerConfig] [] [] [] [INFO]: ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> value.deserializer = class com.confyrm.eps.disp.kafka.SignalDeserializer
> group.id = sds
> partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [127.0.0.1:9092, 127.0.0.1:9094, 127.0.0.1:9096]
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.keystore.type = JKS
> ssl.trustmanager.algorithm = PKIX
> enable.auto.commit = false
> ssl.key.password = [hidden]
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> ssl.truststore.password = [hidden]
> session.timeout.ms = 30000
> metrics.num.samples = 2
> client.id =
> ssl.endpoint.identification.algorithm = null
> key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
> ssl.protocol = TLS
> check.crcs = true
> request.timeout.ms = 40000
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = src/main/resources/ssl/kafka.client.keystore.jks
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 1000
> receive.buffer.bytes = 32768
> ssl.cipher.suites = null
> ssl.truststore.type = JKS
> security.protocol = SSL
> ssl.truststore.location = src/main/resources/ssl/kafka.client.truststore.jks
> ssl.keystore.password = [hidden]
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> auto.offset.reset = earliest
> {code}
> Server configuration:
> {code}
> [2016-05-23 15:04:51,707] INFO KafkaConfig values:
> advertised.host.name = null
> metric.reporters = []
> quota.producer.default = 9223372036854775807
> offsets.topic.num.partitions = 50
> log.flush.interval.messages = 9223372036854775807
> auto.create.topics.enable = true
> controller.socket.timeout.ms = 30000
> log.flush.interval.ms = null
> principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
> replica.socket.receive.buffer.bytes = 65536
> min.insync.replicas = 2
> replica.fetch.wait.max.ms = 500
> num.recovery.threads.per.data.dir = 1
> ssl.keystore.type = JKS
> default.replication.factor = 3
> ssl.truststore.password = [hidden]
> log.preallocate = false
> sasl.kerberos.principal.to.local.rules = [DEFAULT]
> fetch.purgatory.purge.interval.requests = 1000
> ssl.endpoint.identification.algorithm = null
> replica.socket.timeout.ms = 30000
> message.max.bytes = 1000012
> num.io.threads = 10
> offsets.commit.required.acks = -1
> log.flush.offset.checkpoint.interval.ms = 60000
> delete.topic.enable = true
> quota.window.size.seconds = 1
> ssl.truststore.type = JKS
> offsets.commit.timeout.ms = 5000
> quota.window.num = 11
> zookeeper.connect = 127.0.0.1:2181
> authorizer.class.name =
> num.replica.fetchers = 1
> log.retention.ms = null
> log.roll.jitter.hours = 0
> log.cleaner.enable = true
> offsets.load.buffer.size = 5242880
> log.cleaner.delete.retention.ms = 86400000
> ssl.client.auth = none
> controlled.shutdown.max.retries = 3
> queued.max.requests = 500
> offsets.topic.replication.factor = 3
> log.cleaner.threads = 1
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> socket.request.max.bytes = 104857600
> ssl.trustmanager.algorithm = PKIX
> zookeeper.session.timeout.ms = 6000
> log.retention.bytes = -1
> sasl.kerberos.min.time.before.relogin = 60000
> zookeeper.set.acl = false
> connections.max.idle.ms = 600000
> offsets.retention.minutes = 1440
> replica.fetch.backoff.ms = 1000
> inter.broker.protocol.version = 0.9.0.X
> log.retention.hours = 24
> num.partitions = 1
> broker.id.generation.enable = true
> listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> log.roll.ms = null
> log.flush.scheduler.interval.ms = 9223372036854775807
> ssl.cipher.suites = null
> log.index.size.max.bytes = 10485760
> ssl.keymanager.algorithm = SunX509
> security.inter.broker.protocol = PLAINTEXT
> replica.fetch.max.bytes = 1048576
> advertised.port = null
> log.cleaner.dedupe.buffer.size = 134217728
> replica.high.watermark.checkpoint.interval.ms = 5000
> log.cleaner.io.buffer.size = 524288
> sasl.kerberos.ticket.renew.window.factor = 0.8
> zookeeper.connection.timeout.ms = 60000
> controlled.shutdown.retry.backoff.ms = 5000
> log.roll.hours = 168
> log.cleanup.policy = delete
> host.name =
> log.roll.jitter.ms = null
> max.connections.per.ip = 2147483647
> offsets.topic.segment.bytes = 104857600
> background.threads = 10
> quota.consumer.default = 9223372036854775807
> request.timeout.ms = 30000
> log.index.interval.bytes = 4096
> log.dir = /tmp/kafka-logs
> log.segment.bytes = 1073741824
> log.cleaner.backoff.ms = 15000
> offset.metadata.max.bytes = 4096
> ssl.truststore.location = /ssl/server.truststore.jks
> group.max.session.timeout.ms = 30000
> ssl.keystore.password = [hidden]
> zookeeper.sync.time.ms = 2000
> port = 9092
> log.retention.minutes = null
> log.segment.delete.delay.ms = 60000
> log.dirs = /data
> controlled.shutdown.enable = true
> compression.type = producer
> max.connections.per.ip.overrides =
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
> auto.leader.rebalance.enable = true
> leader.imbalance.check.interval.seconds = 300
> log.cleaner.min.cleanable.ratio = 0.5
> replica.lag.time.max.ms = 10000
> num.network.threads = 5
> ssl.key.password = [hidden]
> reserved.broker.max.id = 1000
> metrics.num.samples = 2
> socket.send.buffer.bytes = 102400
> ssl.protocol = TLS
> socket.receive.buffer.bytes = 102400
> ssl.keystore.location = /ssl/server.keystore.jks
> replica.fetch.min.bytes = 1
> unclean.leader.election.enable = true
> group.min.session.timeout.ms = 6000
> log.cleaner.io.buffer.load.factor = 0.9
> offsets.retention.check.interval.ms = 600000
> producer.purgatory.purge.interval.requests = 1000
> metrics.sample.window.ms = 30000
> broker.id = 1
> offsets.topic.compression.codec = 0
> log.retention.check.interval.ms = 300000
> advertised.listeners = PLAINTEXT://localhost:9092,SSL://localhost:9093
> leader.imbalance.per.broker.percentage = 10
> (kafka.server.KafkaConfig)
> {code}
> Client:
> {code}
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
> at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194)
> at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:161)
> at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:45)
> at org.apache.kafka.common.network.Selector.close(Selector.java:442)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:310)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134)
> at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorKnown(AbstractCoordinator.java:184)
> at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:886)
> at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> {code}
> Server:
> {code}
> [2016-05-23 15:07:16,427] WARN Unexpected error from /127.0.0.1; closing connection (org.apache.kafka.common.network.Selector)
> org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 369296128 larger than 104857600)
> at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
> at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153)
> at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
> at kafka.network.Processor.run(SocketServer.scala:413)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)