You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jaikiran Pai <ja...@gmail.com> on 2015/01/19 18:07:43 UTC
NullPointerException in RequestSendThread
I often see the following exception while running some tests
(ProducerFailureHandlingTest.testNoResponse is one such instance):
[2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
Controller 0 fails to send a request to broker
id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
java.lang.NullPointerException
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:150)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Looking at that code in question, I can see that the NPE can be trigger
when the "receive" is null which can happen if the "isRunning" is false
(i.e a shutdown has been requested). The fix to prevent this seems
straightforward:
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..10f4c5a 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
Utils.swallow(Thread.sleep(300))
}
}
- var response: RequestOrResponse = null
- request.requestId.get match {
- case RequestKeys.LeaderAndIsrKey =>
- response = LeaderAndIsrResponse.readFrom(receive.buffer)
- case RequestKeys.StopReplicaKey =>
- response = StopReplicaResponse.readFrom(receive.buffer)
- case RequestKeys.UpdateMetadataKey =>
- response = UpdateMetadataResponse.readFrom(receive.buffer)
- }
- stateChangeLogger.trace("Controller %d epoch %d received
response %s for a request sent to broker %s"
- .format(controllerId,
controllerContext.epoch, response.toString, toBroker.toString))
+ if (receive != null) {
+ var response: RequestOrResponse = null
+ request.requestId.get match {
+ case RequestKeys.LeaderAndIsrKey =>
+ response = LeaderAndIsrResponse.readFrom(receive.buffer)
+ case RequestKeys.StopReplicaKey =>
+ response = StopReplicaResponse.readFrom(receive.buffer)
+ case RequestKeys.UpdateMetadataKey =>
+ response = UpdateMetadataResponse.readFrom(receive.buffer)
+ }
+ stateChangeLogger.trace("Controller %d epoch %d received
response %s for a request sent to broker %s"
+ .format(controllerId, controllerContext.epoch,
response.toString, toBroker.toString))
- if(callback != null) {
- callback(response)
+ if (callback != null) {
+ callback(response)
+ }
}
}
However can this really be considered a fix or would this just be hiding
the real issue and would there be something more that will have to be
done in this case? I'm on trunk FWIW.
-Jaikiran
Re: NullPointerException in RequestSendThread
Posted by Jaikiran Pai <ja...@gmail.com>.
JIRA created https://issues.apache.org/jira/browse/KAFKA-1883 and patch
submitted for review. Thanks Guozhang.
-Jaikiran
On Tuesday 20 January 2015 05:53 AM, Guozhang Wang wrote:
> Hi Jaikiran,
>
> This is a real bug, could you file a JIRA?
>
> As for the fix, I think your proposal would be the right way to fix it.
> "
>
> Guozhang
>
> On Mon, Jan 19, 2015 at 9:07 AM, Jaikiran Pai <ja...@gmail.com>
> wrote:
>
>> I often see the following exception while running some tests
>> (ProducerFailureHandlingTest.testNoResponse is one such instance):
>>
>>
>> [2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
>> Controller 0 fails to send a request to broker
>> id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
>> java.lang.NullPointerException
>> at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
>> scala:150)
>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>>
>>
>> Looking at that code in question, I can see that the NPE can be trigger
>> when the "receive" is null which can happen if the "isRunning" is false
>> (i.e a shutdown has been requested). The fix to prevent this seems
>> straightforward:
>>
>> diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> index eb492f0..10f4c5a 100644
>> --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> @@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
>> Utils.swallow(Thread.sleep(300))
>> }
>> }
>> - var response: RequestOrResponse = null
>> - request.requestId.get match {
>> - case RequestKeys.LeaderAndIsrKey =>
>> - response = LeaderAndIsrResponse.readFrom(receive.buffer)
>> - case RequestKeys.StopReplicaKey =>
>> - response = StopReplicaResponse.readFrom(receive.buffer)
>> - case RequestKeys.UpdateMetadataKey =>
>> - response = UpdateMetadataResponse.readFrom(receive.buffer)
>> - }
>> - stateChangeLogger.trace("Controller %d epoch %d received
>> response %s for a request sent to broker %s"
>> - .format(controllerId,
>> controllerContext.epoch, response.toString, toBroker.toString))
>> + if (receive != null) {
>> + var response: RequestOrResponse = null
>> + request.requestId.get match {
>> + case RequestKeys.LeaderAndIsrKey =>
>> + response = LeaderAndIsrResponse.readFrom(receive.buffer)
>> + case RequestKeys.StopReplicaKey =>
>> + response = StopReplicaResponse.readFrom(receive.buffer)
>> + case RequestKeys.UpdateMetadataKey =>
>> + response = UpdateMetadataResponse.readFrom(receive.buffer)
>> + }
>> + stateChangeLogger.trace("Controller %d epoch %d received
>> response %s for a request sent to broker %s"
>> + .format(controllerId, controllerContext.epoch,
>> response.toString, toBroker.toString))
>>
>> - if(callback != null) {
>> - callback(response)
>> + if (callback != null) {
>> + callback(response)
>> + }
>> }
>> }
>>
>>
>> However can this really be considered a fix or would this just be hiding
>> the real issue and would there be something more that will have to be done
>> in this case? I'm on trunk FWIW.
>>
>>
>> -Jaikiran
>>
>
>
Re: NullPointerException in RequestSendThread
Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jaikiran,
This is a real bug, could you file a JIRA?
As for the fix, I think your proposal would be the right way to fix it.
"
Guozhang
On Mon, Jan 19, 2015 at 9:07 AM, Jaikiran Pai <ja...@gmail.com>
wrote:
> I often see the following exception while running some tests
> (ProducerFailureHandlingTest.testNoResponse is one such instance):
>
>
> [2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
> Controller 0 fails to send a request to broker
> id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
> java.lang.NullPointerException
> at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
> scala:150)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> Looking at that code in question, I can see that the NPE can be trigger
> when the "receive" is null which can happen if the "isRunning" is false
> (i.e a shutdown has been requested). The fix to prevent this seems
> straightforward:
>
> diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> index eb492f0..10f4c5a 100644
> --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> @@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
> Utils.swallow(Thread.sleep(300))
> }
> }
> - var response: RequestOrResponse = null
> - request.requestId.get match {
> - case RequestKeys.LeaderAndIsrKey =>
> - response = LeaderAndIsrResponse.readFrom(receive.buffer)
> - case RequestKeys.StopReplicaKey =>
> - response = StopReplicaResponse.readFrom(receive.buffer)
> - case RequestKeys.UpdateMetadataKey =>
> - response = UpdateMetadataResponse.readFrom(receive.buffer)
> - }
> - stateChangeLogger.trace("Controller %d epoch %d received
> response %s for a request sent to broker %s"
> - .format(controllerId,
> controllerContext.epoch, response.toString, toBroker.toString))
> + if (receive != null) {
> + var response: RequestOrResponse = null
> + request.requestId.get match {
> + case RequestKeys.LeaderAndIsrKey =>
> + response = LeaderAndIsrResponse.readFrom(receive.buffer)
> + case RequestKeys.StopReplicaKey =>
> + response = StopReplicaResponse.readFrom(receive.buffer)
> + case RequestKeys.UpdateMetadataKey =>
> + response = UpdateMetadataResponse.readFrom(receive.buffer)
> + }
> + stateChangeLogger.trace("Controller %d epoch %d received
> response %s for a request sent to broker %s"
> + .format(controllerId, controllerContext.epoch,
> response.toString, toBroker.toString))
>
> - if(callback != null) {
> - callback(response)
> + if (callback != null) {
> + callback(response)
> + }
> }
> }
>
>
> However can this really be considered a fix or would this just be hiding
> the real issue and would there be something more that will have to be done
> in this case? I'm on trunk FWIW.
>
>
> -Jaikiran
>
--
-- Guozhang