You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Ismael Juma (JIRA)" <ji...@apache.org> on 2017/01/19 18:48:27 UTC

[jira] [Commented] (KAFKA-4670) Kafka Consumer should validate FetchResponse

    [ https://issues.apache.org/jira/browse/KAFKA-4670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15830402#comment-15830402 ] 

Ismael Juma commented on KAFKA-4670:
------------------------------------

Seems like a duplicate of KAFKA-2512. KAFKA-4493 is also related.

> Kafka Consumer should validate FetchResponse
> --------------------------------------------
>
>                 Key: KAFKA-4670
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4670
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 0.10.2.0
>            Reporter: Roger Hoover
>            Assignee: Jason Gustafson
>            Priority: Minor
>
> As a negative test case, I purposefully configured a bad advertised listener endpoint.  
> {code}
> advertised.listeners=PLAINTEXT://www.google.com:80
> {code}
> This causes the Consumer to over-allocate and run out of memory.
> {quote}
> [2017-01-18 10:03:03,866] DEBUG Sending metadata request (type=MetadataRequest, topics=foo) to node -1 (org.apache.kafka.clients.NetworkClient)
> [2017-01-18 10:03:03,870] DEBUG Updated cluster metadata version 2 to Cluster(id = oerqPfCuTCKYUUaWdFUSVQ, nodes = [www.google.com:80 (id: 0 rack: null)], partitions = [Partition(topic = foo, partition = 0, leader = 0, replicas = [0], isr = [0])]) (org.apache.kafka.clients.Metadata)
> [2017-01-18 10:03:03,871] DEBUG Received group coordinator response ClientResponse(receivedTimeMs=1484762583870, latencyMs=88, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, responseBody={error_code=0,coordinator={node_id=0,host=www.google.com,port=80}}) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-01-18 10:03:03,871] INFO Discovered coordinator www.google.com:80 (id: 2147483647 rack: null) for group console-consumer-64535. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-01-18 10:03:03,871] DEBUG Initiating connection to node 2147483647 at www.google.com:80. (org.apache.kafka.clients.NetworkClient)
> [2017-01-18 10:03:03,915] INFO Revoking previously assigned partitions [] for group console-consumer-64535 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> [2017-01-18 10:03:03,915] INFO (Re-)joining group console-consumer-64535 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-01-18 10:03:03,917] DEBUG Sending JoinGroup ((type: JoinGroupRequest, groupId=console-consumer-64535, sessionTimeout=10000, rebalanceTimeout=300000, memberId=, protocolType=consumer, groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@564fabc8)) to coordinator www.google.com:80 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
> [2017-01-18 10:03:03,932] DEBUG Created socket with SO_RCVBUF = 66646, SO_SNDBUF = 131874, SO_TIMEOUT = 0 to node 2147483647 (org.apache.kafka.common.network.Selector)
> [2017-01-18 10:03:03,932] DEBUG Completed connection to node 2147483647.  Fetching API versions. (org.apache.kafka.clients.NetworkClient)
> [2017-01-18 10:03:03,932] DEBUG Initiating API versions fetch from node 2147483647. (org.apache.kafka.clients.NetworkClient)
> [2017-01-18 10:03:03,990] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
> java.lang.OutOfMemoryError: Java heap space
> 	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
> 	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
> 	at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)
> 	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
> 	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
> 	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
> 	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
> 	at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
> 	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:346)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:331)
> 	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:300)
> 	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:261)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1025)
> 	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:990)
> 	at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:55)
> 	at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:69)
> 	at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:50)
> 	at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {quote}
> It seems like the consumer should validate responses better?  It could check that the version is present/valid before trusting the messageset size bytes.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)