You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Adrien Carreira <ac...@reportlinker.com> on 2016/05/09 07:57:11 UTC

Spout Thread Waiting

Hi there,

I'm using Storm to build a web-crawler, using Storm Crawler SDK.

I'm also using Redis to store new links discovered.

I've a Spout to consume those url. After many debug , I've built the Spout
like this :

public class OutlinkSpoutRedis extends BaseRichSpout {

    private static final Logger LOG = LoggerFactory
            .getLogger(OutlinkSpoutRedis.class);
    private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
    private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();


    @Override
    public void nextTuple() {
        LOG.info(">>> Calling nextTuple");

        if (beingProcessed.size() >= 200000) {
            LOG.info("Too much beeing processed");
            Utils.sleep(50);
            return;
        }

        LOG.info("Pooling from queue");
        Values ret = queue.poll();

        if (ret == null) {
            LOG.info("Pooling from queue = null");
            Utils.sleep(50);
            return;
        }

        LOG.info("Emitting one url");

        String url = ret.get(0).toString();
        beingProcessed.put(url, "");

        this._collector.emit(ret, url);
    }

    @Override
    public void ack(Object msgId) {
        LOG.info("Acking");
        this.beingProcessed.remove(msgId);
        this.ackQueue.offer((String) msgId);
    }

    @Override
    public void fail(Object msgId) {
        LOG.error("Fail tuple {}", msgId);
        this.beingProcessed.remove(msgId);
        this.failQueue.offer((String) msgId);
    }

    private class ProducerThread extends Thread {
        @Override
        public void run() {
            while (activated) {
                try {
                    if (this.queue.size() <= 1000) {
                        this.populateQueue();
                    }

                    Utils.sleep(100);
                } catch (Exception e) {
                    LOG.error("Error reading queues from redis", e);
                }
            }
        }

        private void populateQueue() {
            // Calling Redis to populate Queue
            queue.offer(new Values(url, metadata));
        }
    }

    private abstract class AckFailThread extends Thread {
        @Override
        public void run() {
            while (activated) {
                String message = queue.poll(1, TimeUnit.SECONDS);

                if (message != null) {
                    this.handleMessage(message);
                }
            }
        }

        protected abstract void handleMessage(String message);
    }}


I've remove unnecessary code.
To understand : nextTuple is polling from a queue (populated on another
thread) and ack,fail are emitting to a queue, consumed in two another
thread. So, those three methods are not blocking.

My problem is on running state, my spout is not called sometimes about
thirty seconds, but there still message on nextTuple queue to be consumed.
The spout is not acking or failling, So why the  Spout thread is blocked ?

Thank

Re: Spout Thread Waiting

Posted by Adrien Carreira <ac...@reportlinker.com>.
Hi Julien,

I'm using it to have a metric of how many tuples are beeing processed.

The problem isn't the value setted to high, the problem is that the
nextTuple are not called.

If you look a this screen :

- Green color : Queue size (Another thread in Spout, populating tuple from
ES)
- Blue color : Beeing processed queue

As you can seed, in the begining, nextTuple is called and beeing processed
is up until reaching a max value, then decreseasing because the nextTuple
isn't called anymore by the topology....

[image: Images intégrées 1]

I don't understand why nextTuple isn't called anymore....











2016-05-10 11:01 GMT+02:00 Julien Nioche <li...@gmail.com>:

>
> I haven't set topology.max.spout.pending. Using the default value, but
>> I'm watching a internal Set to check the size of beeing processed tuple.
>>
>
> Found it in the snippet you posted earlier. This duplicates what
> topology.max.spout.pending does so unless you have another use for that
> internal set, it would be a good idea to rely on the default mechanism
> instead.
>
> 200000 is a very large value. Do you have any idea of how diverse your URLs are in terms of hostname / domain / IP?
>
> Bear in mind that the FetcherBolts (both implementations) are polite and
> will block URLs until the minimum configured amount of time has elapsed
> since the previous call to the same server has completed. also if you have
> more URLs in flight than fetching threads then they will sit and wait in
> the queues. Either way, these might trigger a timeout after a while  (30
> secs by default =>
> https://github.com/apache/storm/blob/master/conf/defaults.yaml#L228)
> which could explain what you are experiencing.
>
>
>> I've logged : nextTuple, fail and ack method. And on the log, sometimes,
>> about thirty seconds none of these methods are called... seems that the
>> Thread is busy doing other things
>>
>
> See above.
>
>
>>
>> Yeah I'm using your SDK, very great btw. I've just change the outlinks
>> indexer to store data on redis :-)
>>
>
> Do you mean a StatusUpdaterBolt
> <https://github.com/DigitalPebble/storm-crawler/blob/master/core/src/main/java/com/digitalpebble/storm/crawler/persistence/AbstractStatusUpdaterBolt.java>
> ?
>
> What does the rest of your topology look like?
>
> This thread is quite specific to StormCrawler and might not be of interest
> for other Storm users, feel free to continue to [
> http://groups.google.com/group/digitalpebble] or use the tag stormcrawler
> <http://stackoverflow.com/questions/tagged/stormcrawler> on StackOverflow.
>
> HTH
>
> Julien
>
>
>>
>> 2016-05-09 22:27 GMT+02:00 Julien Nioche <li...@gmail.com>:
>>
>>> Hi Adrien
>>>
>>> Did you set a value to max spout pending? Could it be that you have
>>> reached the max number of tuples in process? Do you see acks or fails
>>> happen during that period?
>>>
>>> Great to hear that you are using StormCrawler BTW
>>>
>>> Julien
>>>
>>>
>>> On 9 May 2016 at 20:48, Adrien Carreira <ac...@reportlinker.com> wrote:
>>>
>>>> I think the problem is when My topology is working the thread calling
>>>> nextTuple seems to be busy... Why the method isn't called ?
>>>>
>>>> Someone can guid me to some documentation or the code calling nextTuple
>>>> just to understand what is blocking....
>>>>
>>>> Thank you guys
>>>>
>>>> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <ac...@reportlinker.com>:
>>>>
>>>>> Hi there,
>>>>>
>>>>> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>>>>>
>>>>> I'm also using Redis to store new links discovered.
>>>>>
>>>>> I've a Spout to consume those url. After many debug , I've built the
>>>>> Spout like this :
>>>>>
>>>>> public class OutlinkSpoutRedis extends BaseRichSpout {
>>>>>
>>>>>     private static final Logger LOG = LoggerFactory
>>>>>             .getLogger(OutlinkSpoutRedis.class);
>>>>>     private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
>>>>>     private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
>>>>>     private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void nextTuple() {
>>>>>         LOG.info(">>> Calling nextTuple");
>>>>>
>>>>>         if (beingProcessed.size() >= 200000) {
>>>>>             LOG.info("Too much beeing processed");
>>>>>             Utils.sleep(50);
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         LOG.info("Pooling from queue");
>>>>>         Values ret = queue.poll();
>>>>>
>>>>>         if (ret == null) {
>>>>>             LOG.info("Pooling from queue = null");
>>>>>             Utils.sleep(50);
>>>>>             return;
>>>>>         }
>>>>>
>>>>>         LOG.info("Emitting one url");
>>>>>
>>>>>         String url = ret.get(0).toString();
>>>>>         beingProcessed.put(url, "");
>>>>>
>>>>>         this._collector.emit(ret, url);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void ack(Object msgId) {
>>>>>         LOG.info("Acking");
>>>>>         this.beingProcessed.remove(msgId);
>>>>>         this.ackQueue.offer((String) msgId);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public void fail(Object msgId) {
>>>>>         LOG.error("Fail tuple {}", msgId);
>>>>>         this.beingProcessed.remove(msgId);
>>>>>         this.failQueue.offer((String) msgId);
>>>>>     }
>>>>>
>>>>>     private class ProducerThread extends Thread {
>>>>>         @Override
>>>>>         public void run() {
>>>>>             while (activated) {
>>>>>                 try {
>>>>>                     if (this.queue.size() <= 1000) {
>>>>>                         this.populateQueue();
>>>>>                     }
>>>>>
>>>>>                     Utils.sleep(100);
>>>>>                 } catch (Exception e) {
>>>>>                     LOG.error("Error reading queues from redis", e);
>>>>>                 }
>>>>>             }
>>>>>         }
>>>>>
>>>>>         private void populateQueue() {
>>>>>             // Calling Redis to populate Queue
>>>>>             queue.offer(new Values(url, metadata));
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private abstract class AckFailThread extends Thread {
>>>>>         @Override
>>>>>         public void run() {
>>>>>             while (activated) {
>>>>>                 String message = queue.poll(1, TimeUnit.SECONDS);
>>>>>
>>>>>                 if (message != null) {
>>>>>                     this.handleMessage(message);
>>>>>                 }
>>>>>             }
>>>>>         }
>>>>>
>>>>>         protected abstract void handleMessage(String message);
>>>>>     }}
>>>>>
>>>>>
>>>>> I've remove unnecessary code.
>>>>> To understand : nextTuple is polling from a queue (populated on
>>>>> another thread) and ack,fail are emitting to a queue, consumed in two
>>>>> another thread. So, those three methods are not blocking.
>>>>>
>>>>> My problem is on running state, my spout is not called sometimes about
>>>>> thirty seconds, but there still message on nextTuple queue to be consumed.
>>>>> The spout is not acking or failling, So why the  Spout thread is
>>>>> blocked ?
>>>>>
>>>>> Thank
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>>
>>> *Open Source Solutions for Text Engineering*
>>>
>>> http://www.digitalpebble.com
>>> http://digitalpebble.blogspot.com/
>>> #digitalpebble <http://twitter.com/digitalpebble>
>>>
>>
>>
>
>
> --
>
> *Open Source Solutions for Text Engineering*
>
> http://www.digitalpebble.com
> http://digitalpebble.blogspot.com/
> #digitalpebble <http://twitter.com/digitalpebble>
>

Re: Spout Thread Waiting

Posted by Julien Nioche <li...@gmail.com>.
> I haven't set topology.max.spout.pending. Using the default value, but
> I'm watching a internal Set to check the size of beeing processed tuple.
>

Found it in the snippet you posted earlier. This duplicates what
topology.max.spout.pending does so unless you have another use for that
internal set, it would be a good idea to rely on the default mechanism
instead.

200000 is a very large value. Do you have any idea of how diverse your
URLs are in terms of hostname / domain / IP?

Bear in mind that the FetcherBolts (both implementations) are polite and
will block URLs until the minimum configured amount of time has elapsed
since the previous call to the same server has completed. also if you have
more URLs in flight than fetching threads then they will sit and wait in
the queues. Either way, these might trigger a timeout after a while  (30
secs by default =>
https://github.com/apache/storm/blob/master/conf/defaults.yaml#L228) which
could explain what you are experiencing.


> I've logged : nextTuple, fail and ack method. And on the log, sometimes,
> about thirty seconds none of these methods are called... seems that the
> Thread is busy doing other things
>

See above.


>
> Yeah I'm using your SDK, very great btw. I've just change the outlinks
> indexer to store data on redis :-)
>

Do you mean a StatusUpdaterBolt
<https://github.com/DigitalPebble/storm-crawler/blob/master/core/src/main/java/com/digitalpebble/storm/crawler/persistence/AbstractStatusUpdaterBolt.java>
?

What does the rest of your topology look like?

This thread is quite specific to StormCrawler and might not be of interest
for other Storm users, feel free to continue to [
http://groups.google.com/group/digitalpebble] or use the tag stormcrawler
<http://stackoverflow.com/questions/tagged/stormcrawler> on StackOverflow.

HTH

Julien


>
> 2016-05-09 22:27 GMT+02:00 Julien Nioche <li...@gmail.com>:
>
>> Hi Adrien
>>
>> Did you set a value to max spout pending? Could it be that you have
>> reached the max number of tuples in process? Do you see acks or fails
>> happen during that period?
>>
>> Great to hear that you are using StormCrawler BTW
>>
>> Julien
>>
>>
>> On 9 May 2016 at 20:48, Adrien Carreira <ac...@reportlinker.com> wrote:
>>
>>> I think the problem is when My topology is working the thread calling
>>> nextTuple seems to be busy... Why the method isn't called ?
>>>
>>> Someone can guid me to some documentation or the code calling nextTuple
>>> just to understand what is blocking....
>>>
>>> Thank you guys
>>>
>>> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <ac...@reportlinker.com>:
>>>
>>>> Hi there,
>>>>
>>>> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>>>>
>>>> I'm also using Redis to store new links discovered.
>>>>
>>>> I've a Spout to consume those url. After many debug , I've built the
>>>> Spout like this :
>>>>
>>>> public class OutlinkSpoutRedis extends BaseRichSpout {
>>>>
>>>>     private static final Logger LOG = LoggerFactory
>>>>             .getLogger(OutlinkSpoutRedis.class);
>>>>     private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
>>>>     private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
>>>>     private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();
>>>>
>>>>
>>>>     @Override
>>>>     public void nextTuple() {
>>>>         LOG.info(">>> Calling nextTuple");
>>>>
>>>>         if (beingProcessed.size() >= 200000) {
>>>>             LOG.info("Too much beeing processed");
>>>>             Utils.sleep(50);
>>>>             return;
>>>>         }
>>>>
>>>>         LOG.info("Pooling from queue");
>>>>         Values ret = queue.poll();
>>>>
>>>>         if (ret == null) {
>>>>             LOG.info("Pooling from queue = null");
>>>>             Utils.sleep(50);
>>>>             return;
>>>>         }
>>>>
>>>>         LOG.info("Emitting one url");
>>>>
>>>>         String url = ret.get(0).toString();
>>>>         beingProcessed.put(url, "");
>>>>
>>>>         this._collector.emit(ret, url);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void ack(Object msgId) {
>>>>         LOG.info("Acking");
>>>>         this.beingProcessed.remove(msgId);
>>>>         this.ackQueue.offer((String) msgId);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void fail(Object msgId) {
>>>>         LOG.error("Fail tuple {}", msgId);
>>>>         this.beingProcessed.remove(msgId);
>>>>         this.failQueue.offer((String) msgId);
>>>>     }
>>>>
>>>>     private class ProducerThread extends Thread {
>>>>         @Override
>>>>         public void run() {
>>>>             while (activated) {
>>>>                 try {
>>>>                     if (this.queue.size() <= 1000) {
>>>>                         this.populateQueue();
>>>>                     }
>>>>
>>>>                     Utils.sleep(100);
>>>>                 } catch (Exception e) {
>>>>                     LOG.error("Error reading queues from redis", e);
>>>>                 }
>>>>             }
>>>>         }
>>>>
>>>>         private void populateQueue() {
>>>>             // Calling Redis to populate Queue
>>>>             queue.offer(new Values(url, metadata));
>>>>         }
>>>>     }
>>>>
>>>>     private abstract class AckFailThread extends Thread {
>>>>         @Override
>>>>         public void run() {
>>>>             while (activated) {
>>>>                 String message = queue.poll(1, TimeUnit.SECONDS);
>>>>
>>>>                 if (message != null) {
>>>>                     this.handleMessage(message);
>>>>                 }
>>>>             }
>>>>         }
>>>>
>>>>         protected abstract void handleMessage(String message);
>>>>     }}
>>>>
>>>>
>>>> I've remove unnecessary code.
>>>> To understand : nextTuple is polling from a queue (populated on another
>>>> thread) and ack,fail are emitting to a queue, consumed in two another
>>>> thread. So, those three methods are not blocking.
>>>>
>>>> My problem is on running state, my spout is not called sometimes about
>>>> thirty seconds, but there still message on nextTuple queue to be consumed.
>>>> The spout is not acking or failling, So why the  Spout thread is
>>>> blocked ?
>>>>
>>>> Thank
>>>>
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>>
>> *Open Source Solutions for Text Engineering*
>>
>> http://www.digitalpebble.com
>> http://digitalpebble.blogspot.com/
>> #digitalpebble <http://twitter.com/digitalpebble>
>>
>
>


-- 

*Open Source Solutions for Text Engineering*

http://www.digitalpebble.com
http://digitalpebble.blogspot.com/
#digitalpebble <http://twitter.com/digitalpebble>

Re: Spout Thread Waiting

Posted by Adrien Carreira <ac...@reportlinker.com>.
I haven't set topology.max.spout.pending. Using the default value, but I'm
watching a internal Set to check the size of beeing processed tuple.

I've logged : nextTuple, fail and ack method. And on the log, sometimes,
about thirty seconds none of these methods are called... seems that the
Thread is busy doing other things

Yeah I'm using your SDK, very great btw. I've just change the outlinks
indexer to store data on redis :-)

2016-05-09 22:27 GMT+02:00 Julien Nioche <li...@gmail.com>:

> Hi Adrien
>
> Did you set a value to max spout pending? Could it be that you have
> reached the max number of tuples in process? Do you see acks or fails
> happen during that period?
>
> Great to hear that you are using StormCrawler BTW
>
> Julien
>
>
> On 9 May 2016 at 20:48, Adrien Carreira <ac...@reportlinker.com> wrote:
>
>> I think the problem is when My topology is working the thread calling
>> nextTuple seems to be busy... Why the method isn't called ?
>>
>> Someone can guid me to some documentation or the code calling nextTuple
>> just to understand what is blocking....
>>
>> Thank you guys
>>
>> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <ac...@reportlinker.com>:
>>
>>> Hi there,
>>>
>>> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>>>
>>> I'm also using Redis to store new links discovered.
>>>
>>> I've a Spout to consume those url. After many debug , I've built the
>>> Spout like this :
>>>
>>> public class OutlinkSpoutRedis extends BaseRichSpout {
>>>
>>>     private static final Logger LOG = LoggerFactory
>>>             .getLogger(OutlinkSpoutRedis.class);
>>>     private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
>>>     private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
>>>     private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();
>>>
>>>
>>>     @Override
>>>     public void nextTuple() {
>>>         LOG.info(">>> Calling nextTuple");
>>>
>>>         if (beingProcessed.size() >= 200000) {
>>>             LOG.info("Too much beeing processed");
>>>             Utils.sleep(50);
>>>             return;
>>>         }
>>>
>>>         LOG.info("Pooling from queue");
>>>         Values ret = queue.poll();
>>>
>>>         if (ret == null) {
>>>             LOG.info("Pooling from queue = null");
>>>             Utils.sleep(50);
>>>             return;
>>>         }
>>>
>>>         LOG.info("Emitting one url");
>>>
>>>         String url = ret.get(0).toString();
>>>         beingProcessed.put(url, "");
>>>
>>>         this._collector.emit(ret, url);
>>>     }
>>>
>>>     @Override
>>>     public void ack(Object msgId) {
>>>         LOG.info("Acking");
>>>         this.beingProcessed.remove(msgId);
>>>         this.ackQueue.offer((String) msgId);
>>>     }
>>>
>>>     @Override
>>>     public void fail(Object msgId) {
>>>         LOG.error("Fail tuple {}", msgId);
>>>         this.beingProcessed.remove(msgId);
>>>         this.failQueue.offer((String) msgId);
>>>     }
>>>
>>>     private class ProducerThread extends Thread {
>>>         @Override
>>>         public void run() {
>>>             while (activated) {
>>>                 try {
>>>                     if (this.queue.size() <= 1000) {
>>>                         this.populateQueue();
>>>                     }
>>>
>>>                     Utils.sleep(100);
>>>                 } catch (Exception e) {
>>>                     LOG.error("Error reading queues from redis", e);
>>>                 }
>>>             }
>>>         }
>>>
>>>         private void populateQueue() {
>>>             // Calling Redis to populate Queue
>>>             queue.offer(new Values(url, metadata));
>>>         }
>>>     }
>>>
>>>     private abstract class AckFailThread extends Thread {
>>>         @Override
>>>         public void run() {
>>>             while (activated) {
>>>                 String message = queue.poll(1, TimeUnit.SECONDS);
>>>
>>>                 if (message != null) {
>>>                     this.handleMessage(message);
>>>                 }
>>>             }
>>>         }
>>>
>>>         protected abstract void handleMessage(String message);
>>>     }}
>>>
>>>
>>> I've remove unnecessary code.
>>> To understand : nextTuple is polling from a queue (populated on another
>>> thread) and ack,fail are emitting to a queue, consumed in two another
>>> thread. So, those three methods are not blocking.
>>>
>>> My problem is on running state, my spout is not called sometimes about
>>> thirty seconds, but there still message on nextTuple queue to be consumed.
>>> The spout is not acking or failling, So why the  Spout thread is blocked
>>> ?
>>>
>>> Thank
>>>
>>>
>>>
>>>
>>
>
>
> --
>
> *Open Source Solutions for Text Engineering*
>
> http://www.digitalpebble.com
> http://digitalpebble.blogspot.com/
> #digitalpebble <http://twitter.com/digitalpebble>
>

Re: Spout Thread Waiting

Posted by Julien Nioche <li...@gmail.com>.
Hi Adrien

Did you set a value to max spout pending? Could it be that you have reached
the max number of tuples in process? Do you see acks or fails happen during
that period?

Great to hear that you are using StormCrawler BTW

Julien


On 9 May 2016 at 20:48, Adrien Carreira <ac...@reportlinker.com> wrote:

> I think the problem is when My topology is working the thread calling
> nextTuple seems to be busy... Why the method isn't called ?
>
> Someone can guid me to some documentation or the code calling nextTuple
> just to understand what is blocking....
>
> Thank you guys
>
> 2016-05-09 9:57 GMT+02:00 Adrien Carreira <ac...@reportlinker.com>:
>
>> Hi there,
>>
>> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>>
>> I'm also using Redis to store new links discovered.
>>
>> I've a Spout to consume those url. After many debug , I've built the
>> Spout like this :
>>
>> public class OutlinkSpoutRedis extends BaseRichSpout {
>>
>>     private static final Logger LOG = LoggerFactory
>>             .getLogger(OutlinkSpoutRedis.class);
>>     private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
>>     private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
>>     private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();
>>
>>
>>     @Override
>>     public void nextTuple() {
>>         LOG.info(">>> Calling nextTuple");
>>
>>         if (beingProcessed.size() >= 200000) {
>>             LOG.info("Too much beeing processed");
>>             Utils.sleep(50);
>>             return;
>>         }
>>
>>         LOG.info("Pooling from queue");
>>         Values ret = queue.poll();
>>
>>         if (ret == null) {
>>             LOG.info("Pooling from queue = null");
>>             Utils.sleep(50);
>>             return;
>>         }
>>
>>         LOG.info("Emitting one url");
>>
>>         String url = ret.get(0).toString();
>>         beingProcessed.put(url, "");
>>
>>         this._collector.emit(ret, url);
>>     }
>>
>>     @Override
>>     public void ack(Object msgId) {
>>         LOG.info("Acking");
>>         this.beingProcessed.remove(msgId);
>>         this.ackQueue.offer((String) msgId);
>>     }
>>
>>     @Override
>>     public void fail(Object msgId) {
>>         LOG.error("Fail tuple {}", msgId);
>>         this.beingProcessed.remove(msgId);
>>         this.failQueue.offer((String) msgId);
>>     }
>>
>>     private class ProducerThread extends Thread {
>>         @Override
>>         public void run() {
>>             while (activated) {
>>                 try {
>>                     if (this.queue.size() <= 1000) {
>>                         this.populateQueue();
>>                     }
>>
>>                     Utils.sleep(100);
>>                 } catch (Exception e) {
>>                     LOG.error("Error reading queues from redis", e);
>>                 }
>>             }
>>         }
>>
>>         private void populateQueue() {
>>             // Calling Redis to populate Queue
>>             queue.offer(new Values(url, metadata));
>>         }
>>     }
>>
>>     private abstract class AckFailThread extends Thread {
>>         @Override
>>         public void run() {
>>             while (activated) {
>>                 String message = queue.poll(1, TimeUnit.SECONDS);
>>
>>                 if (message != null) {
>>                     this.handleMessage(message);
>>                 }
>>             }
>>         }
>>
>>         protected abstract void handleMessage(String message);
>>     }}
>>
>>
>> I've remove unnecessary code.
>> To understand : nextTuple is polling from a queue (populated on another
>> thread) and ack,fail are emitting to a queue, consumed in two another
>> thread. So, those three methods are not blocking.
>>
>> My problem is on running state, my spout is not called sometimes about
>> thirty seconds, but there still message on nextTuple queue to be consumed.
>> The spout is not acking or failling, So why the  Spout thread is blocked ?
>>
>> Thank
>>
>>
>>
>>
>


-- 

*Open Source Solutions for Text Engineering*

http://www.digitalpebble.com
http://digitalpebble.blogspot.com/
#digitalpebble <http://twitter.com/digitalpebble>

Re: Spout Thread Waiting

Posted by Adrien Carreira <ac...@reportlinker.com>.
I think the problem is when My topology is working the thread calling
nextTuple seems to be busy... Why the method isn't called ?

Someone can guid me to some documentation or the code calling nextTuple
just to understand what is blocking....

Thank you guys

2016-05-09 9:57 GMT+02:00 Adrien Carreira <ac...@reportlinker.com>:

> Hi there,
>
> I'm using Storm to build a web-crawler, using Storm Crawler SDK.
>
> I'm also using Redis to store new links discovered.
>
> I've a Spout to consume those url. After many debug , I've built the Spout
> like this :
>
> public class OutlinkSpoutRedis extends BaseRichSpout {
>
>     private static final Logger LOG = LoggerFactory
>             .getLogger(OutlinkSpoutRedis.class);
>     private LinkedBlockingQueue<Values> queue = new LinkedBlockingQueue<>();
>     private LinkedBlockingQueue<String> ackQueue = new LinkedBlockingQueue<>();
>     private LinkedBlockingQueue<String> failQueue = new LinkedBlockingQueue<>();
>
>
>     @Override
>     public void nextTuple() {
>         LOG.info(">>> Calling nextTuple");
>
>         if (beingProcessed.size() >= 200000) {
>             LOG.info("Too much beeing processed");
>             Utils.sleep(50);
>             return;
>         }
>
>         LOG.info("Pooling from queue");
>         Values ret = queue.poll();
>
>         if (ret == null) {
>             LOG.info("Pooling from queue = null");
>             Utils.sleep(50);
>             return;
>         }
>
>         LOG.info("Emitting one url");
>
>         String url = ret.get(0).toString();
>         beingProcessed.put(url, "");
>
>         this._collector.emit(ret, url);
>     }
>
>     @Override
>     public void ack(Object msgId) {
>         LOG.info("Acking");
>         this.beingProcessed.remove(msgId);
>         this.ackQueue.offer((String) msgId);
>     }
>
>     @Override
>     public void fail(Object msgId) {
>         LOG.error("Fail tuple {}", msgId);
>         this.beingProcessed.remove(msgId);
>         this.failQueue.offer((String) msgId);
>     }
>
>     private class ProducerThread extends Thread {
>         @Override
>         public void run() {
>             while (activated) {
>                 try {
>                     if (this.queue.size() <= 1000) {
>                         this.populateQueue();
>                     }
>
>                     Utils.sleep(100);
>                 } catch (Exception e) {
>                     LOG.error("Error reading queues from redis", e);
>                 }
>             }
>         }
>
>         private void populateQueue() {
>             // Calling Redis to populate Queue
>             queue.offer(new Values(url, metadata));
>         }
>     }
>
>     private abstract class AckFailThread extends Thread {
>         @Override
>         public void run() {
>             while (activated) {
>                 String message = queue.poll(1, TimeUnit.SECONDS);
>
>                 if (message != null) {
>                     this.handleMessage(message);
>                 }
>             }
>         }
>
>         protected abstract void handleMessage(String message);
>     }}
>
>
> I've remove unnecessary code.
> To understand : nextTuple is polling from a queue (populated on another
> thread) and ack,fail are emitting to a queue, consumed in two another
> thread. So, those three methods are not blocking.
>
> My problem is on running state, my spout is not called sometimes about
> thirty seconds, but there still message on nextTuple queue to be consumed.
> The spout is not acking or failling, So why the  Spout thread is blocked ?
>
> Thank
>
>
>
>