You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jaikiran Pai <ja...@gmail.com> on 2015/01/19 18:07:43 UTC

NullPointerException in RequestSendThread

I often see the following exception while running some tests 
(ProducerFailureHandlingTest.testNoResponse is one such instance):


[2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread], 
Controller 0 fails to send a request to broker 
id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
java.lang.NullPointerException
     at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:150)
     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


Looking at that code in question, I can see that the NPE can be trigger 
when the "receive" is null which can happen if the "isRunning" is false 
(i.e a shutdown has been requested). The fix to prevent this seems 
straightforward:

diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index eb492f0..10f4c5a 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
                Utils.swallow(Thread.sleep(300))
            }
          }
-        var response: RequestOrResponse = null
-        request.requestId.get match {
-          case RequestKeys.LeaderAndIsrKey =>
-            response = LeaderAndIsrResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaKey =>
-            response = StopReplicaResponse.readFrom(receive.buffer)
-          case RequestKeys.UpdateMetadataKey =>
-            response = UpdateMetadataResponse.readFrom(receive.buffer)
-        }
-        stateChangeLogger.trace("Controller %d epoch %d received 
response %s for a request sent to broker %s"
-                                  .format(controllerId, 
controllerContext.epoch, response.toString, toBroker.toString))
+        if (receive != null) {
+          var response: RequestOrResponse = null
+          request.requestId.get match {
+            case RequestKeys.LeaderAndIsrKey =>
+              response = LeaderAndIsrResponse.readFrom(receive.buffer)
+            case RequestKeys.StopReplicaKey =>
+              response = StopReplicaResponse.readFrom(receive.buffer)
+            case RequestKeys.UpdateMetadataKey =>
+              response = UpdateMetadataResponse.readFrom(receive.buffer)
+          }
+          stateChangeLogger.trace("Controller %d epoch %d received 
response %s for a request sent to broker %s"
+            .format(controllerId, controllerContext.epoch, 
response.toString, toBroker.toString))

-        if(callback != null) {
-          callback(response)
+          if (callback != null) {
+            callback(response)
+          }
          }
        }


However can this really be considered a fix or would this just be hiding 
the real issue and would there be something more that will have to be 
done in this case? I'm on trunk FWIW.


-Jaikiran

Re: NullPointerException in RequestSendThread

Posted by Jaikiran Pai <ja...@gmail.com>.
JIRA created https://issues.apache.org/jira/browse/KAFKA-1883 and patch 
submitted for review. Thanks Guozhang.

-Jaikiran

On Tuesday 20 January 2015 05:53 AM, Guozhang Wang wrote:
> Hi Jaikiran,
>
> This is a real bug, could you file a JIRA?
>
> As for the fix, I think your proposal would be the right way to fix it.
> "
>
> Guozhang
>
> On Mon, Jan 19, 2015 at 9:07 AM, Jaikiran Pai <ja...@gmail.com>
> wrote:
>
>> I often see the following exception while running some tests
>> (ProducerFailureHandlingTest.testNoResponse is one such instance):
>>
>>
>> [2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
>> Controller 0 fails to send a request to broker
>> id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
>> java.lang.NullPointerException
>>      at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
>> scala:150)
>>      at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>>
>>
>> Looking at that code in question, I can see that the NPE can be trigger
>> when the "receive" is null which can happen if the "isRunning" is false
>> (i.e a shutdown has been requested). The fix to prevent this seems
>> straightforward:
>>
>> diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> index eb492f0..10f4c5a 100644
>> --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
>> @@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
>>                 Utils.swallow(Thread.sleep(300))
>>             }
>>           }
>> -        var response: RequestOrResponse = null
>> -        request.requestId.get match {
>> -          case RequestKeys.LeaderAndIsrKey =>
>> -            response = LeaderAndIsrResponse.readFrom(receive.buffer)
>> -          case RequestKeys.StopReplicaKey =>
>> -            response = StopReplicaResponse.readFrom(receive.buffer)
>> -          case RequestKeys.UpdateMetadataKey =>
>> -            response = UpdateMetadataResponse.readFrom(receive.buffer)
>> -        }
>> -        stateChangeLogger.trace("Controller %d epoch %d received
>> response %s for a request sent to broker %s"
>> -                                  .format(controllerId,
>> controllerContext.epoch, response.toString, toBroker.toString))
>> +        if (receive != null) {
>> +          var response: RequestOrResponse = null
>> +          request.requestId.get match {
>> +            case RequestKeys.LeaderAndIsrKey =>
>> +              response = LeaderAndIsrResponse.readFrom(receive.buffer)
>> +            case RequestKeys.StopReplicaKey =>
>> +              response = StopReplicaResponse.readFrom(receive.buffer)
>> +            case RequestKeys.UpdateMetadataKey =>
>> +              response = UpdateMetadataResponse.readFrom(receive.buffer)
>> +          }
>> +          stateChangeLogger.trace("Controller %d epoch %d received
>> response %s for a request sent to broker %s"
>> +            .format(controllerId, controllerContext.epoch,
>> response.toString, toBroker.toString))
>>
>> -        if(callback != null) {
>> -          callback(response)
>> +          if (callback != null) {
>> +            callback(response)
>> +          }
>>           }
>>         }
>>
>>
>> However can this really be considered a fix or would this just be hiding
>> the real issue and would there be something more that will have to be done
>> in this case? I'm on trunk FWIW.
>>
>>
>> -Jaikiran
>>
>
>


Re: NullPointerException in RequestSendThread

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Jaikiran,

This is a real bug, could you file a JIRA?

As for the fix, I think your proposal would be the right way to fix it.
"

Guozhang

On Mon, Jan 19, 2015 at 9:07 AM, Jaikiran Pai <ja...@gmail.com>
wrote:

> I often see the following exception while running some tests
> (ProducerFailureHandlingTest.testNoResponse is one such instance):
>
>
> [2015-01-19 22:30:24,257] ERROR [Controller-0-to-broker-1-send-thread],
> Controller 0 fails to send a request to broker
> id:1,host:localhost,port:56729 (kafka.controller.RequestSendThread:103)
> java.lang.NullPointerException
>     at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
> scala:150)
>     at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
> Looking at that code in question, I can see that the NPE can be trigger
> when the "receive" is null which can happen if the "isRunning" is false
> (i.e a shutdown has been requested). The fix to prevent this seems
> straightforward:
>
> diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> index eb492f0..10f4c5a 100644
> --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
> @@ -144,20 +144,22 @@ class RequestSendThread(val controllerId: Int,
>                Utils.swallow(Thread.sleep(300))
>            }
>          }
> -        var response: RequestOrResponse = null
> -        request.requestId.get match {
> -          case RequestKeys.LeaderAndIsrKey =>
> -            response = LeaderAndIsrResponse.readFrom(receive.buffer)
> -          case RequestKeys.StopReplicaKey =>
> -            response = StopReplicaResponse.readFrom(receive.buffer)
> -          case RequestKeys.UpdateMetadataKey =>
> -            response = UpdateMetadataResponse.readFrom(receive.buffer)
> -        }
> -        stateChangeLogger.trace("Controller %d epoch %d received
> response %s for a request sent to broker %s"
> -                                  .format(controllerId,
> controllerContext.epoch, response.toString, toBroker.toString))
> +        if (receive != null) {
> +          var response: RequestOrResponse = null
> +          request.requestId.get match {
> +            case RequestKeys.LeaderAndIsrKey =>
> +              response = LeaderAndIsrResponse.readFrom(receive.buffer)
> +            case RequestKeys.StopReplicaKey =>
> +              response = StopReplicaResponse.readFrom(receive.buffer)
> +            case RequestKeys.UpdateMetadataKey =>
> +              response = UpdateMetadataResponse.readFrom(receive.buffer)
> +          }
> +          stateChangeLogger.trace("Controller %d epoch %d received
> response %s for a request sent to broker %s"
> +            .format(controllerId, controllerContext.epoch,
> response.toString, toBroker.toString))
>
> -        if(callback != null) {
> -          callback(response)
> +          if (callback != null) {
> +            callback(response)
> +          }
>          }
>        }
>
>
> However can this really be considered a fix or would this just be hiding
> the real issue and would there be something more that will have to be done
> in this case? I'm on trunk FWIW.
>
>
> -Jaikiran
>



-- 
-- Guozhang