You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Idan Fridman <id...@gmail.com> on 2015/02/10 17:29:27 UTC

NullPointerException from deep inside Storm

I am opening async call to a webservice from a bolt.

I'am opening socket and retrieving the result asynchronous(using external
AsycHttpClient library) and after that I am emitting to the next bolt

I asked and read that if I synchronized the outputCollector it will make
sure all that all acks and callbacks will be called from the same Thread.

However after load-test I started to get this:


java.lang.RuntimeException: java.lang.NullPointerException at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException at
clojure.lang.RT.intCast(RT.java:1087) at
backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)

Thats my bolt:

public class AsyncBolt extends BaseRichBolt {

...

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }


    @Override
    public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
        asyncHttpClient = new AsyncHttpClient();
        outputCollector = collector;
    }


    @Override
    public void execute(final Tuple tuple) {

        asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
            @Override
            public Response onCompleted(Response response) throws Exception
{
               ...
                emitTuple(response, tuple);
                return response;
            }
        });
    }
    //we are synchronizing basicOutputCollector
    // because we have callbacks and we need to make sure all acks are
called from the same thread
    private void emitTuple(Response response, Tuple tuple) {
        synchronized (outputCollector) {
            outputCollector.emit(tuple, new Values(response));
            outputCollector.ack(tuple);
        }
    }
}

Please any leads about that?

Thank you.

Re: NullPointerException from deep inside Storm

Posted by Idan Fridman <id...@gmail.com>.
Hi All,
I wonder if anyone could take a look into it? this exception is keep
occurring to me. any leads please?

2015-02-11 10:18 GMT+02:00 Idan Fridman <id...@gmail.com>:

> Hi,
> Here is the almost full bolt I am using which causing that exception:
>
>
> public class AsyncBolt extends BaseRichBolt {
>
>
>     private AsyncHttpClient asyncHttpClient;
>     private OutputCollector outputCollector;
>
>
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("pushMessageResponse"));
>     }
>
>
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         asyncHttpClient = new AsyncHttpClient();
>         outputCollector = collector;
>
>     }
>
>
>     @Override
>     public void execute(final Tuple tuple) {
>         final PushMessageRequestDTO pushMessageRequestDTO =
> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>         String url = "some.url";
>         asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
>             @Override
>             public Response onCompleted(Response response) throws
> Exception {
>                 PushMessageResponseDTO pushMessageResponseDTO = new
> PushMessageResponseDTO(pushMessageRequestDTO);
>                 emitTuple(pushMessageResponseDTO, tuple);
>                 return response;
>             }
>         });
>     }
>     //we are synchronizing basicOutputCollector
>     // because we have callbacks and we need to make sure all acks are
> called from the same thread
>     private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO,
> Tuple tuple) {
>         synchronized (outputCollector) {
>             outputCollector.emit(tuple, new
> Values(pushMessageResponseDTO));
>             outputCollector.ack(tuple);
>         }
>     }
> }
>
>
>
> 2015-02-10 20:27 GMT+02:00 Idan Fridman <id...@gmail.com>:
>
>> Yes I cutted those from this post for the sake simplicity aswell. I can
>> past the whole bolt it just has some intern logic
>> On Feb 10, 2015 8:14 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>>
>>> Your output fields declarer isn't invoked. Is it empty in your actual
>>> implementation as well?
>>>
>>> Perhaps you can post a gist with your full file?
>>>
>>> *Michael Rose*
>>> Senior Platform Engineer
>>> *Full*Contact | fullcontact.com
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>> m: +1.720.837.1357 | t: @xorlev
>>>
>>>
>>> All Your Contacts, Updated and In One Place.
>>> Try FullContact for Free
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>
>>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> I actually created pojo of mine and emitted it. I edited here the code
>>>> for the sake of simplicity. So you think I should try convert the Object
>>>> into byte/string and then emmit? What about the outputcollector
>>>> synchronizing ? Does it make sense?
>>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com>
>>>> wrote:
>>>>
>>>>> Out of curiosity, have you tried just emitting the response as a
>>>>> string/byte array vs. the whole response object?
>>>>>
>>>>> *Michael Rose*
>>>>> Senior Platform Engineer
>>>>> *Full*Contact | fullcontact.com
>>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>> m: +1.720.837.1357 | t: @xorlev
>>>>>
>>>>>
>>>>> All Your Contacts, Updated and In One Place.
>>>>> Try FullContact for Free
>>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>>
>>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>>>
>>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>>>>
>>>>>>> I am opening async call to a webservice from a bolt.
>>>>>>>
>>>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>>>>> bolt
>>>>>>>
>>>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>>>> make sure all that all acks and callbacks will be called from the same
>>>>>>> Thread.
>>>>>>>
>>>>>>> However after load-test I started to get this:
>>>>>>>
>>>>>>>
>>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>> at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.lang.NullPointerException at
>>>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>>>> at
>>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>
>>>>>>> Thats my bolt:
>>>>>>>
>>>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>>>> OutputCollector collector) {
>>>>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>>>>         outputCollector = collector;
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void execute(final Tuple tuple) {
>>>>>>>
>>>>>>>         asyncHttpClient.preparePost(url).execute(new
>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>>             @Override
>>>>>>>             public Response onCompleted(Response response) throws
>>>>>>> Exception {
>>>>>>>                ...
>>>>>>>                 emitTuple(response, tuple);
>>>>>>>                 return response;
>>>>>>>             }
>>>>>>>         });
>>>>>>>     }
>>>>>>>     //we are synchronizing basicOutputCollector
>>>>>>>     // because we have callbacks and we need to make sure all acks
>>>>>>> are called from the same thread
>>>>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>>>>         synchronized (outputCollector) {
>>>>>>>             outputCollector.emit(tuple, new Values(response));
>>>>>>>             outputCollector.ack(tuple);
>>>>>>>         }
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> Please any leads about that?
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>
>>>>>
>>>
>

Re: NullPointerException from deep inside Storm

Posted by Idan Fridman <id...@gmail.com>.
Hi,
Here is the almost full bolt I am using which causing that exception:


public class AsyncBolt extends BaseRichBolt {


    private AsyncHttpClient asyncHttpClient;
    private OutputCollector outputCollector;



    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("pushMessageResponse"));
    }



    @Override
    public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
        asyncHttpClient = new AsyncHttpClient();
        outputCollector = collector;

    }


    @Override
    public void execute(final Tuple tuple) {
        final PushMessageRequestDTO pushMessageRequestDTO =
(PushMessageRequestDTO) tuple.getValueByField("pushMessage");
        String url = "some.url";
        asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
            @Override
            public Response onCompleted(Response response) throws Exception
{
                PushMessageResponseDTO pushMessageResponseDTO = new
PushMessageResponseDTO(pushMessageRequestDTO);
                emitTuple(pushMessageResponseDTO, tuple);
                return response;
            }
        });
    }
    //we are synchronizing basicOutputCollector
    // because we have callbacks and we need to make sure all acks are
called from the same thread
    private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO,
Tuple tuple) {
        synchronized (outputCollector) {
            outputCollector.emit(tuple, new Values(pushMessageResponseDTO));
            outputCollector.ack(tuple);
        }
    }
}



2015-02-10 20:27 GMT+02:00 Idan Fridman <id...@gmail.com>:

> Yes I cutted those from this post for the sake simplicity aswell. I can
> past the whole bolt it just has some intern logic
> On Feb 10, 2015 8:14 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>
>> Your output fields declarer isn't invoked. Is it empty in your actual
>> implementation as well?
>>
>> Perhaps you can post a gist with your full file?
>>
>> *Michael Rose*
>> Senior Platform Engineer
>> *Full*Contact | fullcontact.com
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>> m: +1.720.837.1357 | t: @xorlev
>>
>>
>> All Your Contacts, Updated and In One Place.
>> Try FullContact for Free
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>
>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com>
>> wrote:
>>
>>> I actually created pojo of mine and emitted it. I edited here the code
>>> for the sake of simplicity. So you think I should try convert the Object
>>> into byte/string and then emmit? What about the outputcollector
>>> synchronizing ? Does it make sense?
>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>>>
>>>> Out of curiosity, have you tried just emitting the response as a
>>>> string/byte array vs. the whole response object?
>>>>
>>>> *Michael Rose*
>>>> Senior Platform Engineer
>>>> *Full*Contact | fullcontact.com
>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>> m: +1.720.837.1357 | t: @xorlev
>>>>
>>>>
>>>> All Your Contacts, Updated and In One Place.
>>>> Try FullContact for Free
>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>
>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>>>> wrote:
>>>>
>>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>>
>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>>>
>>>>>> I am opening async call to a webservice from a bolt.
>>>>>>
>>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>>>> bolt
>>>>>>
>>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>>> make sure all that all acks and callbacks will be called from the same
>>>>>> Thread.
>>>>>>
>>>>>> However after load-test I started to get this:
>>>>>>
>>>>>>
>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>> at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>> at
>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>> at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.NullPointerException at
>>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>>> at
>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>>> at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>
>>>>>> Thats my bolt:
>>>>>>
>>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>>
>>>>>> ...
>>>>>>
>>>>>>     @Override
>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>>> OutputCollector collector) {
>>>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>>>         outputCollector = collector;
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void execute(final Tuple tuple) {
>>>>>>
>>>>>>         asyncHttpClient.preparePost(url).execute(new
>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>             @Override
>>>>>>             public Response onCompleted(Response response) throws
>>>>>> Exception {
>>>>>>                ...
>>>>>>                 emitTuple(response, tuple);
>>>>>>                 return response;
>>>>>>             }
>>>>>>         });
>>>>>>     }
>>>>>>     //we are synchronizing basicOutputCollector
>>>>>>     // because we have callbacks and we need to make sure all acks
>>>>>> are called from the same thread
>>>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>>>         synchronized (outputCollector) {
>>>>>>             outputCollector.emit(tuple, new Values(response));
>>>>>>             outputCollector.ack(tuple);
>>>>>>         }
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Please any leads about that?
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>
>>>>
>>

Re: NullPointerException from deep inside Storm

Posted by Idan Fridman <id...@gmail.com>.
Yes I cutted those from this post for the sake simplicity aswell. I can
past the whole bolt it just has some intern logic
On Feb 10, 2015 8:14 PM, "Michael Rose" <mi...@fullcontact.com> wrote:

> Your output fields declarer isn't invoked. Is it empty in your actual
> implementation as well?
>
> Perhaps you can post a gist with your full file?
>
> *Michael Rose*
> Senior Platform Engineer
> *Full*Contact | fullcontact.com
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
> m: +1.720.837.1357 | t: @xorlev
>
>
> All Your Contacts, Updated and In One Place.
> Try FullContact for Free
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>
> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> I actually created pojo of mine and emitted it. I edited here the code
>> for the sake of simplicity. So you think I should try convert the Object
>> into byte/string and then emmit? What about the outputcollector
>> synchronizing ? Does it make sense?
>> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>>
>>> Out of curiosity, have you tried just emitting the response as a
>>> string/byte array vs. the whole response object?
>>>
>>> *Michael Rose*
>>> Senior Platform Engineer
>>> *Full*Contact | fullcontact.com
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>> m: +1.720.837.1357 | t: @xorlev
>>>
>>>
>>> All Your Contacts, Updated and In One Place.
>>> Try FullContact for Free
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>
>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>
>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>>
>>>>> I am opening async call to a webservice from a bolt.
>>>>>
>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>>> bolt
>>>>>
>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>> make sure all that all acks and callbacks will be called from the same
>>>>> Thread.
>>>>>
>>>>> However after load-test I started to get this:
>>>>>
>>>>>
>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>> at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>> at
>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>> at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.NullPointerException at
>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>> at
>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>> at
>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>
>>>>> Thats my bolt:
>>>>>
>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>
>>>>> ...
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>>> OutputCollector collector) {
>>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>>         outputCollector = collector;
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void execute(final Tuple tuple) {
>>>>>
>>>>>         asyncHttpClient.preparePost(url).execute(new
>>>>> AsyncCompletionHandler<Response>() {
>>>>>             @Override
>>>>>             public Response onCompleted(Response response) throws
>>>>> Exception {
>>>>>                ...
>>>>>                 emitTuple(response, tuple);
>>>>>                 return response;
>>>>>             }
>>>>>         });
>>>>>     }
>>>>>     //we are synchronizing basicOutputCollector
>>>>>     // because we have callbacks and we need to make sure all acks are
>>>>> called from the same thread
>>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>>         synchronized (outputCollector) {
>>>>>             outputCollector.emit(tuple, new Values(response));
>>>>>             outputCollector.ack(tuple);
>>>>>         }
>>>>>     }
>>>>> }
>>>>>
>>>>> Please any leads about that?
>>>>>
>>>>> Thank you.
>>>>>
>>>>
>>>
>

Re: NullPointerException from deep inside Storm

Posted by Michael Rose <mi...@fullcontact.com>.
Your output fields declarer isn't invoked. Is it empty in your actual
implementation as well?

Perhaps you can post a gist with your full file?

*Michael Rose*
Senior Platform Engineer
*Full*Contact | fullcontact.com
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
m: +1.720.837.1357 | t: @xorlev


All Your Contacts, Updated and In One Place.
Try FullContact for Free
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>

On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com> wrote:

> I actually created pojo of mine and emitted it. I edited here the code for
> the sake of simplicity. So you think I should try convert the Object into
> byte/string and then emmit? What about the outputcollector synchronizing ?
> Does it make sense?
> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>
>> Out of curiosity, have you tried just emitting the response as a
>> string/byte array vs. the whole response object?
>>
>> *Michael Rose*
>> Senior Platform Engineer
>> *Full*Contact | fullcontact.com
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>> m: +1.720.837.1357 | t: @xorlev
>>
>>
>> All Your Contacts, Updated and In One Place.
>> Try FullContact for Free
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>
>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>> wrote:
>>
>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>
>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>
>>>> I am opening async call to a webservice from a bolt.
>>>>
>>>> I'am opening socket and retrieving the result asynchronous(using
>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>> bolt
>>>>
>>>> I asked and read that if I synchronized the outputCollector it will
>>>> make sure all that all acks and callbacks will be called from the same
>>>> Thread.
>>>>
>>>> However after load-test I started to get this:
>>>>
>>>>
>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>> at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>> at
>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>> at
>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.NullPointerException at
>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>> at
>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>> at
>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>
>>>> Thats my bolt:
>>>>
>>>> public class AsyncBolt extends BaseRichBolt {
>>>>
>>>> ...
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>>         asyncHttpClient = new AsyncHttpClient();
>>>>         outputCollector = collector;
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void execute(final Tuple tuple) {
>>>>
>>>>         asyncHttpClient.preparePost(url).execute(new
>>>> AsyncCompletionHandler<Response>() {
>>>>             @Override
>>>>             public Response onCompleted(Response response) throws
>>>> Exception {
>>>>                ...
>>>>                 emitTuple(response, tuple);
>>>>                 return response;
>>>>             }
>>>>         });
>>>>     }
>>>>     //we are synchronizing basicOutputCollector
>>>>     // because we have callbacks and we need to make sure all acks are
>>>> called from the same thread
>>>>     private void emitTuple(Response response, Tuple tuple) {
>>>>         synchronized (outputCollector) {
>>>>             outputCollector.emit(tuple, new Values(response));
>>>>             outputCollector.ack(tuple);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> Please any leads about that?
>>>>
>>>> Thank you.
>>>>
>>>
>>

Re: NullPointerException from deep inside Storm

Posted by Idan Fridman <id...@gmail.com>.
I actually created pojo of mine and emitted it. I edited here the code for
the sake of simplicity. So you think I should try convert the Object into
byte/string and then emmit? What about the outputcollector synchronizing ?
Does it make sense?
On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:

> Out of curiosity, have you tried just emitting the response as a
> string/byte array vs. the whole response object?
>
> *Michael Rose*
> Senior Platform Engineer
> *Full*Contact | fullcontact.com
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
> m: +1.720.837.1357 | t: @xorlev
>
>
> All Your Contacts, Updated and In One Place.
> Try FullContact for Free
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>
> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com> wrote:
>
>> One note: it's not BasicOutputCollector. but OutputCollector
>>
>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>
>>> I am opening async call to a webservice from a bolt.
>>>
>>> I'am opening socket and retrieving the result asynchronous(using
>>> external AsycHttpClient library) and after that I am emitting to the next
>>> bolt
>>>
>>> I asked and read that if I synchronized the outputCollector it will make
>>> sure all that all acks and callbacks will be called from the same Thread.
>>>
>>> However after load-test I started to get this:
>>>
>>>
>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> at
>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException at
>>> clojure.lang.RT.intCast(RT.java:1087) at
>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>> at
>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>> at
>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>
>>> Thats my bolt:
>>>
>>> public class AsyncBolt extends BaseRichBolt {
>>>
>>> ...
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>     }
>>>
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector collector) {
>>>         asyncHttpClient = new AsyncHttpClient();
>>>         outputCollector = collector;
>>>     }
>>>
>>>
>>>     @Override
>>>     public void execute(final Tuple tuple) {
>>>
>>>         asyncHttpClient.preparePost(url).execute(new
>>> AsyncCompletionHandler<Response>() {
>>>             @Override
>>>             public Response onCompleted(Response response) throws
>>> Exception {
>>>                ...
>>>                 emitTuple(response, tuple);
>>>                 return response;
>>>             }
>>>         });
>>>     }
>>>     //we are synchronizing basicOutputCollector
>>>     // because we have callbacks and we need to make sure all acks are
>>> called from the same thread
>>>     private void emitTuple(Response response, Tuple tuple) {
>>>         synchronized (outputCollector) {
>>>             outputCollector.emit(tuple, new Values(response));
>>>             outputCollector.ack(tuple);
>>>         }
>>>     }
>>> }
>>>
>>> Please any leads about that?
>>>
>>> Thank you.
>>>
>>
>

Re: NullPointerException from deep inside Storm

Posted by Michael Rose <mi...@fullcontact.com>.
Out of curiosity, have you tried just emitting the response as a
string/byte array vs. the whole response object?

*Michael Rose*
Senior Platform Engineer
*Full*Contact | fullcontact.com
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
m: +1.720.837.1357 | t: @xorlev


All Your Contacts, Updated and In One Place.
Try FullContact for Free
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>

On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com> wrote:

> One note: it's not BasicOutputCollector. but OutputCollector
>
> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>
>> I am opening async call to a webservice from a bolt.
>>
>> I'am opening socket and retrieving the result asynchronous(using external
>> AsycHttpClient library) and after that I am emitting to the next bolt
>>
>> I asked and read that if I synchronized the outputCollector it will make
>> sure all that all acks and callbacks will be called from the same Thread.
>>
>> However after load-test I started to get this:
>>
>>
>> java.lang.RuntimeException: java.lang.NullPointerException at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> at
>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException at
>> clojure.lang.RT.intCast(RT.java:1087) at
>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>> at
>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>> at
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>
>> Thats my bolt:
>>
>> public class AsyncBolt extends BaseRichBolt {
>>
>> ...
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>     }
>>
>>
>>     @Override
>>     public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>>         asyncHttpClient = new AsyncHttpClient();
>>         outputCollector = collector;
>>     }
>>
>>
>>     @Override
>>     public void execute(final Tuple tuple) {
>>
>>         asyncHttpClient.preparePost(url).execute(new
>> AsyncCompletionHandler<Response>() {
>>             @Override
>>             public Response onCompleted(Response response) throws
>> Exception {
>>                ...
>>                 emitTuple(response, tuple);
>>                 return response;
>>             }
>>         });
>>     }
>>     //we are synchronizing basicOutputCollector
>>     // because we have callbacks and we need to make sure all acks are
>> called from the same thread
>>     private void emitTuple(Response response, Tuple tuple) {
>>         synchronized (outputCollector) {
>>             outputCollector.emit(tuple, new Values(response));
>>             outputCollector.ack(tuple);
>>         }
>>     }
>> }
>>
>> Please any leads about that?
>>
>> Thank you.
>>
>

Re: NullPointerException from deep inside Storm

Posted by Idan Fridman <id...@gmail.com>.
One note: it's not BasicOutputCollector. but OutputCollector

2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:

> I am opening async call to a webservice from a bolt.
>
> I'am opening socket and retrieving the result asynchronous(using external
> AsycHttpClient library) and after that I am emitting to the next bolt
>
> I asked and read that if I synchronized the outputCollector it will make
> sure all that all acks and callbacks will be called from the same Thread.
>
> However after load-test I started to get this:
>
>
> java.lang.RuntimeException: java.lang.NullPointerException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException at
> clojure.lang.RT.intCast(RT.java:1087) at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
> at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
> at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>
> Thats my bolt:
>
> public class AsyncBolt extends BaseRichBolt {
>
> ...
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     }
>
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         asyncHttpClient = new AsyncHttpClient();
>         outputCollector = collector;
>     }
>
>
>     @Override
>     public void execute(final Tuple tuple) {
>
>         asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
>             @Override
>             public Response onCompleted(Response response) throws
> Exception {
>                ...
>                 emitTuple(response, tuple);
>                 return response;
>             }
>         });
>     }
>     //we are synchronizing basicOutputCollector
>     // because we have callbacks and we need to make sure all acks are
> called from the same thread
>     private void emitTuple(Response response, Tuple tuple) {
>         synchronized (outputCollector) {
>             outputCollector.emit(tuple, new Values(response));
>             outputCollector.ack(tuple);
>         }
>     }
> }
>
> Please any leads about that?
>
> Thank you.
>