You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Idan Fridman <id...@gmail.com> on 2015/02/10 17:29:27 UTC
NullPointerException from deep inside Storm
I am opening async call to a webservice from a bolt.
I'am opening socket and retrieving the result asynchronous(using external
AsycHttpClient library) and after that I am emitting to the next bolt
I asked and read that if I synchronized the outputCollector it will make
sure all that all acks and callbacks will be called from the same Thread.
However after load-test I started to get this:
java.lang.RuntimeException: java.lang.NullPointerException at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException at
clojure.lang.RT.intCast(RT.java:1087) at
backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
at
backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
at
backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
Thats my bolt:
public class AsyncBolt extends BaseRichBolt {
...
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
asyncHttpClient = new AsyncHttpClient();
outputCollector = collector;
}
@Override
public void execute(final Tuple tuple) {
asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception
{
...
emitTuple(response, tuple);
return response;
}
});
}
//we are synchronizing basicOutputCollector
// because we have callbacks and we need to make sure all acks are
called from the same thread
private void emitTuple(Response response, Tuple tuple) {
synchronized (outputCollector) {
outputCollector.emit(tuple, new Values(response));
outputCollector.ack(tuple);
}
}
}
Please any leads about that?
Thank you.
Re: NullPointerException from deep inside Storm
Posted by Idan Fridman <id...@gmail.com>.
Hi All,
I wonder if anyone could take a look into it? this exception is keep
occurring to me. any leads please?
2015-02-11 10:18 GMT+02:00 Idan Fridman <id...@gmail.com>:
> Hi,
> Here is the almost full bolt I am using which causing that exception:
>
>
> public class AsyncBolt extends BaseRichBolt {
>
>
> private AsyncHttpClient asyncHttpClient;
> private OutputCollector outputCollector;
>
>
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> declarer.declare(new Fields("pushMessageResponse"));
> }
>
>
>
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> asyncHttpClient = new AsyncHttpClient();
> outputCollector = collector;
>
> }
>
>
> @Override
> public void execute(final Tuple tuple) {
> final PushMessageRequestDTO pushMessageRequestDTO =
> (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
> String url = "some.url";
> asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
> @Override
> public Response onCompleted(Response response) throws
> Exception {
> PushMessageResponseDTO pushMessageResponseDTO = new
> PushMessageResponseDTO(pushMessageRequestDTO);
> emitTuple(pushMessageResponseDTO, tuple);
> return response;
> }
> });
> }
> //we are synchronizing basicOutputCollector
> // because we have callbacks and we need to make sure all acks are
> called from the same thread
> private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO,
> Tuple tuple) {
> synchronized (outputCollector) {
> outputCollector.emit(tuple, new
> Values(pushMessageResponseDTO));
> outputCollector.ack(tuple);
> }
> }
> }
>
>
>
> 2015-02-10 20:27 GMT+02:00 Idan Fridman <id...@gmail.com>:
>
>> Yes I cutted those from this post for the sake simplicity aswell. I can
>> past the whole bolt it just has some intern logic
>> On Feb 10, 2015 8:14 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>>
>>> Your output fields declarer isn't invoked. Is it empty in your actual
>>> implementation as well?
>>>
>>> Perhaps you can post a gist with your full file?
>>>
>>> *Michael Rose*
>>> Senior Platform Engineer
>>> *Full*Contact | fullcontact.com
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>> m: +1.720.837.1357 | t: @xorlev
>>>
>>>
>>> All Your Contacts, Updated and In One Place.
>>> Try FullContact for Free
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>
>>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> I actually created pojo of mine and emitted it. I edited here the code
>>>> for the sake of simplicity. So you think I should try convert the Object
>>>> into byte/string and then emmit? What about the outputcollector
>>>> synchronizing ? Does it make sense?
>>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com>
>>>> wrote:
>>>>
>>>>> Out of curiosity, have you tried just emitting the response as a
>>>>> string/byte array vs. the whole response object?
>>>>>
>>>>> *Michael Rose*
>>>>> Senior Platform Engineer
>>>>> *Full*Contact | fullcontact.com
>>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>> m: +1.720.837.1357 | t: @xorlev
>>>>>
>>>>>
>>>>> All Your Contacts, Updated and In One Place.
>>>>> Try FullContact for Free
>>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>>
>>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>>>
>>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>>>>
>>>>>>> I am opening async call to a webservice from a bolt.
>>>>>>>
>>>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>>>>> bolt
>>>>>>>
>>>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>>>> make sure all that all acks and callbacks will be called from the same
>>>>>>> Thread.
>>>>>>>
>>>>>>> However after load-test I started to get this:
>>>>>>>
>>>>>>>
>>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>>> at
>>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.lang.NullPointerException at
>>>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>>>> at
>>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>>>> at
>>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>>
>>>>>>> Thats my bolt:
>>>>>>>
>>>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>>>
>>>>>>> ...
>>>>>>>
>>>>>>> @Override
>>>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> @Override
>>>>>>> public void prepare(Map stormConf, TopologyContext context,
>>>>>>> OutputCollector collector) {
>>>>>>> asyncHttpClient = new AsyncHttpClient();
>>>>>>> outputCollector = collector;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> @Override
>>>>>>> public void execute(final Tuple tuple) {
>>>>>>>
>>>>>>> asyncHttpClient.preparePost(url).execute(new
>>>>>>> AsyncCompletionHandler<Response>() {
>>>>>>> @Override
>>>>>>> public Response onCompleted(Response response) throws
>>>>>>> Exception {
>>>>>>> ...
>>>>>>> emitTuple(response, tuple);
>>>>>>> return response;
>>>>>>> }
>>>>>>> });
>>>>>>> }
>>>>>>> //we are synchronizing basicOutputCollector
>>>>>>> // because we have callbacks and we need to make sure all acks
>>>>>>> are called from the same thread
>>>>>>> private void emitTuple(Response response, Tuple tuple) {
>>>>>>> synchronized (outputCollector) {
>>>>>>> outputCollector.emit(tuple, new Values(response));
>>>>>>> outputCollector.ack(tuple);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Please any leads about that?
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>
>>>>>
>>>
>
Re: NullPointerException from deep inside Storm
Posted by Idan Fridman <id...@gmail.com>.
Hi,
Here is the almost full bolt I am using which causing that exception:
public class AsyncBolt extends BaseRichBolt {
private AsyncHttpClient asyncHttpClient;
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("pushMessageResponse"));
}
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
asyncHttpClient = new AsyncHttpClient();
outputCollector = collector;
}
@Override
public void execute(final Tuple tuple) {
final PushMessageRequestDTO pushMessageRequestDTO =
(PushMessageRequestDTO) tuple.getValueByField("pushMessage");
String url = "some.url";
asyncHttpClient.preparePost(url).execute(new
AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception
{
PushMessageResponseDTO pushMessageResponseDTO = new
PushMessageResponseDTO(pushMessageRequestDTO);
emitTuple(pushMessageResponseDTO, tuple);
return response;
}
});
}
//we are synchronizing basicOutputCollector
// because we have callbacks and we need to make sure all acks are
called from the same thread
private void emitTuple(PushMessageResponseDTO pushMessageResponseDTO,
Tuple tuple) {
synchronized (outputCollector) {
outputCollector.emit(tuple, new Values(pushMessageResponseDTO));
outputCollector.ack(tuple);
}
}
}
2015-02-10 20:27 GMT+02:00 Idan Fridman <id...@gmail.com>:
> Yes I cutted those from this post for the sake simplicity aswell. I can
> past the whole bolt it just has some intern logic
> On Feb 10, 2015 8:14 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>
>> Your output fields declarer isn't invoked. Is it empty in your actual
>> implementation as well?
>>
>> Perhaps you can post a gist with your full file?
>>
>> *Michael Rose*
>> Senior Platform Engineer
>> *Full*Contact | fullcontact.com
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>> m: +1.720.837.1357 | t: @xorlev
>>
>>
>> All Your Contacts, Updated and In One Place.
>> Try FullContact for Free
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>
>> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com>
>> wrote:
>>
>>> I actually created pojo of mine and emitted it. I edited here the code
>>> for the sake of simplicity. So you think I should try convert the Object
>>> into byte/string and then emmit? What about the outputcollector
>>> synchronizing ? Does it make sense?
>>> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>>>
>>>> Out of curiosity, have you tried just emitting the response as a
>>>> string/byte array vs. the whole response object?
>>>>
>>>> *Michael Rose*
>>>> Senior Platform Engineer
>>>> *Full*Contact | fullcontact.com
>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>> m: +1.720.837.1357 | t: @xorlev
>>>>
>>>>
>>>> All Your Contacts, Updated and In One Place.
>>>> Try FullContact for Free
>>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>>
>>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>>>> wrote:
>>>>
>>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>>
>>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>>>
>>>>>> I am opening async call to a webservice from a bolt.
>>>>>>
>>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>>>> bolt
>>>>>>
>>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>>> make sure all that all acks and callbacks will be called from the same
>>>>>> Thread.
>>>>>>
>>>>>> However after load-test I started to get this:
>>>>>>
>>>>>>
>>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>>> at
>>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>>> at
>>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>>> at
>>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.lang.NullPointerException at
>>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>>> at
>>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>>> at
>>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>>
>>>>>> Thats my bolt:
>>>>>>
>>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>>
>>>>>> ...
>>>>>>
>>>>>> @Override
>>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>> }
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>> public void prepare(Map stormConf, TopologyContext context,
>>>>>> OutputCollector collector) {
>>>>>> asyncHttpClient = new AsyncHttpClient();
>>>>>> outputCollector = collector;
>>>>>> }
>>>>>>
>>>>>>
>>>>>> @Override
>>>>>> public void execute(final Tuple tuple) {
>>>>>>
>>>>>> asyncHttpClient.preparePost(url).execute(new
>>>>>> AsyncCompletionHandler<Response>() {
>>>>>> @Override
>>>>>> public Response onCompleted(Response response) throws
>>>>>> Exception {
>>>>>> ...
>>>>>> emitTuple(response, tuple);
>>>>>> return response;
>>>>>> }
>>>>>> });
>>>>>> }
>>>>>> //we are synchronizing basicOutputCollector
>>>>>> // because we have callbacks and we need to make sure all acks
>>>>>> are called from the same thread
>>>>>> private void emitTuple(Response response, Tuple tuple) {
>>>>>> synchronized (outputCollector) {
>>>>>> outputCollector.emit(tuple, new Values(response));
>>>>>> outputCollector.ack(tuple);
>>>>>> }
>>>>>> }
>>>>>> }
>>>>>>
>>>>>> Please any leads about that?
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>
>>>>
>>
Re: NullPointerException from deep inside Storm
Posted by Idan Fridman <id...@gmail.com>.
Yes I cutted those from this post for the sake simplicity aswell. I can
past the whole bolt it just has some intern logic
On Feb 10, 2015 8:14 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
> Your output fields declarer isn't invoked. Is it empty in your actual
> implementation as well?
>
> Perhaps you can post a gist with your full file?
>
> *Michael Rose*
> Senior Platform Engineer
> *Full*Contact | fullcontact.com
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
> m: +1.720.837.1357 | t: @xorlev
>
>
> All Your Contacts, Updated and In One Place.
> Try FullContact for Free
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>
> On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> I actually created pojo of mine and emitted it. I edited here the code
>> for the sake of simplicity. So you think I should try convert the Object
>> into byte/string and then emmit? What about the outputcollector
>> synchronizing ? Does it make sense?
>> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>>
>>> Out of curiosity, have you tried just emitting the response as a
>>> string/byte array vs. the whole response object?
>>>
>>> *Michael Rose*
>>> Senior Platform Engineer
>>> *Full*Contact | fullcontact.com
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>> m: +1.720.837.1357 | t: @xorlev
>>>
>>>
>>> All Your Contacts, Updated and In One Place.
>>> Try FullContact for Free
>>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>>
>>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>>
>>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>>
>>>>> I am opening async call to a webservice from a bolt.
>>>>>
>>>>> I'am opening socket and retrieving the result asynchronous(using
>>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>>> bolt
>>>>>
>>>>> I asked and read that if I synchronized the outputCollector it will
>>>>> make sure all that all acks and callbacks will be called from the same
>>>>> Thread.
>>>>>
>>>>> However after load-test I started to get this:
>>>>>
>>>>>
>>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>>> at
>>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>>> at
>>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>>> at
>>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.lang.NullPointerException at
>>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>>> at
>>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>>> at
>>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>>
>>>>> Thats my bolt:
>>>>>
>>>>> public class AsyncBolt extends BaseRichBolt {
>>>>>
>>>>> ...
>>>>>
>>>>> @Override
>>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>> }
>>>>>
>>>>>
>>>>> @Override
>>>>> public void prepare(Map stormConf, TopologyContext context,
>>>>> OutputCollector collector) {
>>>>> asyncHttpClient = new AsyncHttpClient();
>>>>> outputCollector = collector;
>>>>> }
>>>>>
>>>>>
>>>>> @Override
>>>>> public void execute(final Tuple tuple) {
>>>>>
>>>>> asyncHttpClient.preparePost(url).execute(new
>>>>> AsyncCompletionHandler<Response>() {
>>>>> @Override
>>>>> public Response onCompleted(Response response) throws
>>>>> Exception {
>>>>> ...
>>>>> emitTuple(response, tuple);
>>>>> return response;
>>>>> }
>>>>> });
>>>>> }
>>>>> //we are synchronizing basicOutputCollector
>>>>> // because we have callbacks and we need to make sure all acks are
>>>>> called from the same thread
>>>>> private void emitTuple(Response response, Tuple tuple) {
>>>>> synchronized (outputCollector) {
>>>>> outputCollector.emit(tuple, new Values(response));
>>>>> outputCollector.ack(tuple);
>>>>> }
>>>>> }
>>>>> }
>>>>>
>>>>> Please any leads about that?
>>>>>
>>>>> Thank you.
>>>>>
>>>>
>>>
>
Re: NullPointerException from deep inside Storm
Posted by Michael Rose <mi...@fullcontact.com>.
Your output fields declarer isn't invoked. Is it empty in your actual
implementation as well?
Perhaps you can post a gist with your full file?
*Michael Rose*
Senior Platform Engineer
*Full*Contact | fullcontact.com
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
m: +1.720.837.1357 | t: @xorlev
All Your Contacts, Updated and In One Place.
Try FullContact for Free
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
On Tue, Feb 10, 2015 at 11:05 AM, Idan Fridman <id...@gmail.com> wrote:
> I actually created pojo of mine and emitted it. I edited here the code for
> the sake of simplicity. So you think I should try convert the Object into
> byte/string and then emmit? What about the outputcollector synchronizing ?
> Does it make sense?
> On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
>
>> Out of curiosity, have you tried just emitting the response as a
>> string/byte array vs. the whole response object?
>>
>> *Michael Rose*
>> Senior Platform Engineer
>> *Full*Contact | fullcontact.com
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>> m: +1.720.837.1357 | t: @xorlev
>>
>>
>> All Your Contacts, Updated and In One Place.
>> Try FullContact for Free
>> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>>
>> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com>
>> wrote:
>>
>>> One note: it's not BasicOutputCollector. but OutputCollector
>>>
>>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>>
>>>> I am opening async call to a webservice from a bolt.
>>>>
>>>> I'am opening socket and retrieving the result asynchronous(using
>>>> external AsycHttpClient library) and after that I am emitting to the next
>>>> bolt
>>>>
>>>> I asked and read that if I synchronized the outputCollector it will
>>>> make sure all that all acks and callbacks will be called from the same
>>>> Thread.
>>>>
>>>> However after load-test I started to get this:
>>>>
>>>>
>>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>>> at
>>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>>> at
>>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>>> at
>>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.lang.NullPointerException at
>>>> clojure.lang.RT.intCast(RT.java:1087) at
>>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>>> at
>>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>>> at
>>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>>
>>>> Thats my bolt:
>>>>
>>>> public class AsyncBolt extends BaseRichBolt {
>>>>
>>>> ...
>>>>
>>>> @Override
>>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>> }
>>>>
>>>>
>>>> @Override
>>>> public void prepare(Map stormConf, TopologyContext context,
>>>> OutputCollector collector) {
>>>> asyncHttpClient = new AsyncHttpClient();
>>>> outputCollector = collector;
>>>> }
>>>>
>>>>
>>>> @Override
>>>> public void execute(final Tuple tuple) {
>>>>
>>>> asyncHttpClient.preparePost(url).execute(new
>>>> AsyncCompletionHandler<Response>() {
>>>> @Override
>>>> public Response onCompleted(Response response) throws
>>>> Exception {
>>>> ...
>>>> emitTuple(response, tuple);
>>>> return response;
>>>> }
>>>> });
>>>> }
>>>> //we are synchronizing basicOutputCollector
>>>> // because we have callbacks and we need to make sure all acks are
>>>> called from the same thread
>>>> private void emitTuple(Response response, Tuple tuple) {
>>>> synchronized (outputCollector) {
>>>> outputCollector.emit(tuple, new Values(response));
>>>> outputCollector.ack(tuple);
>>>> }
>>>> }
>>>> }
>>>>
>>>> Please any leads about that?
>>>>
>>>> Thank you.
>>>>
>>>
>>
Re: NullPointerException from deep inside Storm
Posted by Idan Fridman <id...@gmail.com>.
I actually created pojo of mine and emitted it. I edited here the code for
the sake of simplicity. So you think I should try convert the Object into
byte/string and then emmit? What about the outputcollector synchronizing ?
Does it make sense?
On Feb 10, 2015 8:00 PM, "Michael Rose" <mi...@fullcontact.com> wrote:
> Out of curiosity, have you tried just emitting the response as a
> string/byte array vs. the whole response object?
>
> *Michael Rose*
> Senior Platform Engineer
> *Full*Contact | fullcontact.com
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
> m: +1.720.837.1357 | t: @xorlev
>
>
> All Your Contacts, Updated and In One Place.
> Try FullContact for Free
> <https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
>
> On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com> wrote:
>
>> One note: it's not BasicOutputCollector. but OutputCollector
>>
>> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>
>>> I am opening async call to a webservice from a bolt.
>>>
>>> I'am opening socket and retrieving the result asynchronous(using
>>> external AsycHttpClient library) and after that I am emitting to the next
>>> bolt
>>>
>>> I asked and read that if I synchronized the outputCollector it will make
>>> sure all that all acks and callbacks will be called from the same Thread.
>>>
>>> However after load-test I started to get this:
>>>
>>>
>>> java.lang.RuntimeException: java.lang.NullPointerException at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>>> at
>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>>> at
>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>>> at
>>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.lang.NullPointerException at
>>> clojure.lang.RT.intCast(RT.java:1087) at
>>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>>> at
>>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>>> at
>>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>>
>>> Thats my bolt:
>>>
>>> public class AsyncBolt extends BaseRichBolt {
>>>
>>> ...
>>>
>>> @Override
>>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>> }
>>>
>>>
>>> @Override
>>> public void prepare(Map stormConf, TopologyContext context,
>>> OutputCollector collector) {
>>> asyncHttpClient = new AsyncHttpClient();
>>> outputCollector = collector;
>>> }
>>>
>>>
>>> @Override
>>> public void execute(final Tuple tuple) {
>>>
>>> asyncHttpClient.preparePost(url).execute(new
>>> AsyncCompletionHandler<Response>() {
>>> @Override
>>> public Response onCompleted(Response response) throws
>>> Exception {
>>> ...
>>> emitTuple(response, tuple);
>>> return response;
>>> }
>>> });
>>> }
>>> //we are synchronizing basicOutputCollector
>>> // because we have callbacks and we need to make sure all acks are
>>> called from the same thread
>>> private void emitTuple(Response response, Tuple tuple) {
>>> synchronized (outputCollector) {
>>> outputCollector.emit(tuple, new Values(response));
>>> outputCollector.ack(tuple);
>>> }
>>> }
>>> }
>>>
>>> Please any leads about that?
>>>
>>> Thank you.
>>>
>>
>
Re: NullPointerException from deep inside Storm
Posted by Michael Rose <mi...@fullcontact.com>.
Out of curiosity, have you tried just emitting the response as a
string/byte array vs. the whole response object?
*Michael Rose*
Senior Platform Engineer
*Full*Contact | fullcontact.com
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
m: +1.720.837.1357 | t: @xorlev
All Your Contacts, Updated and In One Place.
Try FullContact for Free
<https://www.fullcontact.com/?utm_source=FullContact%20-%20Email%20Signatures&utm_medium=email&utm_content=Signature%20Link&utm_campaign=FullContact%20-%20Email%20Signatures>
On Tue, Feb 10, 2015 at 9:30 AM, Idan Fridman <id...@gmail.com> wrote:
> One note: it's not BasicOutputCollector. but OutputCollector
>
> 2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>
>> I am opening async call to a webservice from a bolt.
>>
>> I'am opening socket and retrieving the result asynchronous(using external
>> AsycHttpClient library) and after that I am emitting to the next bolt
>>
>> I asked and read that if I synchronized the outputCollector it will make
>> sure all that all acks and callbacks will be called from the same Thread.
>>
>> However after load-test I started to get this:
>>
>>
>> java.lang.RuntimeException: java.lang.NullPointerException at
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>> at
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>> at
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>> at
>> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
>> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
>> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.lang.NullPointerException at
>> clojure.lang.RT.intCast(RT.java:1087) at
>> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
>> at
>> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
>> at
>> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>>
>> Thats my bolt:
>>
>> public class AsyncBolt extends BaseRichBolt {
>>
>> ...
>>
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> }
>>
>>
>> @Override
>> public void prepare(Map stormConf, TopologyContext context,
>> OutputCollector collector) {
>> asyncHttpClient = new AsyncHttpClient();
>> outputCollector = collector;
>> }
>>
>>
>> @Override
>> public void execute(final Tuple tuple) {
>>
>> asyncHttpClient.preparePost(url).execute(new
>> AsyncCompletionHandler<Response>() {
>> @Override
>> public Response onCompleted(Response response) throws
>> Exception {
>> ...
>> emitTuple(response, tuple);
>> return response;
>> }
>> });
>> }
>> //we are synchronizing basicOutputCollector
>> // because we have callbacks and we need to make sure all acks are
>> called from the same thread
>> private void emitTuple(Response response, Tuple tuple) {
>> synchronized (outputCollector) {
>> outputCollector.emit(tuple, new Values(response));
>> outputCollector.ack(tuple);
>> }
>> }
>> }
>>
>> Please any leads about that?
>>
>> Thank you.
>>
>
Re: NullPointerException from deep inside Storm
Posted by Idan Fridman <id...@gmail.com>.
One note: it's not BasicOutputCollector. but OutputCollector
2015-02-10 18:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
> I am opening async call to a webservice from a bolt.
>
> I'am opening socket and retrieving the result asynchronous(using external
> AsycHttpClient library) and after that I am emitting to the next bolt
>
> I asked and read that if I synchronized the outputCollector it will make
> sure all that all acks and callbacks will be called from the same Thread.
>
> However after load-test I started to get this:
>
>
> java.lang.RuntimeException: java.lang.NullPointerException at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> at
> backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94)
> at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) at
> clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException at
> clojure.lang.RT.intCast(RT.java:1087) at
> backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129)
> at
> backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258)
> at
> backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
>
> Thats my bolt:
>
> public class AsyncBolt extends BaseRichBolt {
>
> ...
>
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> }
>
>
> @Override
> public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
> asyncHttpClient = new AsyncHttpClient();
> outputCollector = collector;
> }
>
>
> @Override
> public void execute(final Tuple tuple) {
>
> asyncHttpClient.preparePost(url).execute(new
> AsyncCompletionHandler<Response>() {
> @Override
> public Response onCompleted(Response response) throws
> Exception {
> ...
> emitTuple(response, tuple);
> return response;
> }
> });
> }
> //we are synchronizing basicOutputCollector
> // because we have callbacks and we need to make sure all acks are
> called from the same thread
> private void emitTuple(Response response, Tuple tuple) {
> synchronized (outputCollector) {
> outputCollector.emit(tuple, new Values(response));
> outputCollector.ack(tuple);
> }
> }
> }
>
> Please any leads about that?
>
> Thank you.
>