You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Manikumar (JIRA)" <ji...@apache.org> on 2018/04/18 03:07:00 UTC

[jira] [Commented] (KAFKA-6404) OldConsumer FetchRequest apiVersion not match resulting in broker RequestHandler socket leak

    [ https://issues.apache.org/jira/browse/KAFKA-6404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16441820#comment-16441820 ] 

Manikumar commented on KAFKA-6404:
----------------------------------

SocketServer error handling is improved in KAFKA-5607. This must have been handled in KAFKA-5607. 

> OldConsumer FetchRequest apiVersion not match resulting in broker RequestHandler socket leak
> --------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6404
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6404
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.10.0.1
>            Reporter: Yu Gan
>            Priority: Critical
>
> *kafka broker version*: 0.10.0.1
> *cluster info*: 200+ nodes, no acls, any client in the same LAN could access
> *situation*: someone uses high released version (such as 0.11.x) of bin/kafka-console-consumer.sh with parameter "--zookeeper" to continuously consume a topic with partitions spread all the brokers
> *phenomenon*: 
> 1.broker server log:
> errors like: 
> 1) Connection to 2 was disconnected before the response was read;
> 2) Shrinking ISR for partition [abc, 21] from 33,13,14 to 33;
> 3) ERROR Processor got uncaught exception. (kafka.network.Processor) java.nio.BufferUnderflowException
> 2.common consumers keeping in rebalance status:
> errors like:
> 1) c.p.b.f.l.c.FiberTopoWorkerThread : got uncaught exception
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
> 2) java.lang.IllegalStateException: Correlation id for response (1246203) does not match request (1246122)
> *bad results*: kafka brokers in sick
> *root cause*: 
> 1) OldConsumer after 0.10.1 in ConsumerFetcherThread.scala setting requestVersion 3:
> {code:java}
> private val fetchRequestBuilder = new FetchRequestBuilder().
>     clientId(clientId).
>     replicaId(Request.OrdinaryConsumerId).
>     maxWait(config.fetchWaitMaxMs).
>     minBytes(config.fetchMinBytes).
>     requestVersion(3) // for now, the old consumer is pinned to the old message format through the fetch request
> {code}
> but in 0.10.0.1 FetchRequest.CurrentVersion=2, FetchRequst.readFrom wouldn't read the field "max_bytes" from version 3, then throws "BufferUnderflowException" :
> {code:java}
>   def readFrom(buffer: ByteBuffer): FetchRequest = {
>     val versionId = buffer.getShort
>     val correlationId = buffer.getInt
>     val clientId = readShortString(buffer)
>     val replicaId = buffer.getInt
>     val maxWait = buffer.getInt
>     val minBytes = buffer.getInt
>     val topicCount = buffer.getInt
>     val pairs = (1 to topicCount).flatMap(_ => {
>       val topic = readShortString(buffer)
>       val partitionCount = buffer.getInt
>       (1 to partitionCount).map(_ => {
>         val partitionId = buffer.getInt
>         val offset = buffer.getLong
>         val fetchSize = buffer.getInt
>         (TopicAndPartition(topic, partitionId), PartitionFetchInfo(offset, fetchSize))
>       })
>     })
>     FetchRequest(versionId, correlationId, clientId, replicaId, maxWait, minBytes, Map(pairs:_*))
>   }
> {code}
> 2) when the FetchRequst.readFrom crashed with throwable like "BufferUnderflowException" not in "(InvalidRequestException, SchemaException)", the socket wouldn't be closed;
> SocketServer.processCompletedReceives:
> {code:java}
>   private def processCompletedReceives() {
>     selector.completedReceives.asScala.foreach { receive =>
>       try {
>         val openChannel = selector.channel(receive.source)
>         // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
>         val openOrClosingChannel = if (openChannel != null) openChannel else selector.closingChannel(receive.source)
>         val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, openOrClosingChannel.principal.getName), openOrClosingChannel.socketAddress)
>         val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session,
>           buffer = receive.payload, startTimeNanos = time.nanoseconds,
>           listenerName = listenerName, securityProtocol = securityProtocol)
>         requestChannel.sendRequest(req)
>         selector.mute(receive.source)
>       } catch {
>         case e @ (_: InvalidRequestException | _: SchemaException) =>
>           // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
>           error(s"Closing socket for ${receive.source} because of error", e)
>           close(selector, receive.source)
>       }
>     }
>   }
> {code}
> *workaround but not the optimal*:
> throw a known InvalidRequestException(or SchemaException more suitable)  in RequestChannel.scala:
> {code:java}
>     /*// TODO: this will be removed once we migrated to client-side format
>     // for server-side request / response format
>     // NOTE: this map only includes the server-side request/response handlers. Newer
>     // request types should only use the client-side versions which are parsed with
>     // o.a.k.common.requests.AbstractRequest.getRequest()
>     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
>       Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
>         ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
>       )
>     // TODO: this will be removed once we migrated to client-side format
>     val requestObj =
>       keyToNameAndDeserializerMap.get(requestId).map(readFrom => readFrom(buffer)).orNull*/
>     val requestObj: RequestOrResponse = requestId match {
>       case ApiKeys.FETCH.id => getFetchRequest()
>       case ApiKeys.CONTROLLED_SHUTDOWN_KEY.id => ControlledShutdownRequest.readFrom(buffer)
>       case _ => null
>     }
>     def getFetchRequest(): FetchRequest = {
>       try{
>         FetchRequest.readFrom(buffer)
>       }catch {
>         case ex: Throwable =>
>           throw new InvalidRequestException(s"FetchRequest version for API key not match server's "+ requestId + ": " + FetchRequest.CurrentVersion, ex)
>       }
>     }
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)