You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Idan Fridman <id...@gmail.com> on 2015/03/10 13:58:04 UTC

Topology is failing using HttpClient on high throughput

My Topology including a bolt which opening Http Request to webservice.
The average response is 500 milliseconds (how-ever sometimes it takes
longer)

* I added timeout functionality. and I am using KafkaSpout

When I send messages one by one everything working fine but

Under High throughput *that bolt is getting stuck and nothing get into
there anymore.* and the worst thing I am having a "reply" of the messages

The only way to get thru this is to reset kafka's offset. else the
zookeeper still logging kafka's offset and messages are still replying


1. *Why Messages being replied? I dont need that*
2. Here is  my code example of the"ExternalServiceOutputBolt

package com.mycompany.push.topology;

import backtype.storm.metric.api.CountMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import com.mycompany.push.dto.PushMessageRequestDTO;
import com.mycompany.push.dto.PushMessageResponseDTO;
import org.apache.http.NameValuePair;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;



public class ExternalServiceOutputBolt extends BaseBasicBolt {

    private CloseableHttpClient httpClient;


    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("pushMessageResponse"));
    }


    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        externalServiceGraphUrl = (String)
stormConf.get("externalServiceGraphUrl");
        initMetrics(context);
        httpClient = getHttpClientInstance();
    }


    @Override
    public void execute(Tuple tuple, BasicOutputCollector
basicOutputCollector) {
        try {
            received_message_counter.incr();
            final PushMessageRequestDTO pushMessageRequestDTO =
(PushMessageRequestDTO) tuple.getValueByField("pushMessage");
            if (pushMessageRequestDTO != null) {
                PushMessageResponseDTO pushMessageResponseDTO =
executePushNotificationRequest(pushMessageRequestDTO);
                returned_from_externalService_counter.incr();
                System.out.println("externalServiceOutputBolt,emit
tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" +
pushMessageRequestDTO.getRefId());
                basicOutputCollector.emit(new Values(pushMessageResponseDTO));
            }
        } catch (Exception e) {
            log.error("externalServiceOutputBolt. Error", e);
        }
    }

    private PushMessageResponseDTO
executePushNotificationRequest(PushMessageRequestDTO
pushMessageRequestDTO) throws IOException {
        PushMessageResponseDTO pushMessageResponseDTO = new
PushMessageResponseDTO(pushMessageRequestDTO);
        CloseableHttpResponse response = null;
        try {

            HttpPost post = new HttpPost("external.url");
            post.setEntity(new UrlEncodedFormEntity(urlParameters));
            response = httpClient.execute(post);
            response.getEntity();
            if (response.getStatusLine().getStatusCode() != 200) {
                received_not_status_200_counter.incr();
            } else {
                received_status_200_counter.incr();
            }
            log.debug("externalServiceOutputBolt.onCompleted,
pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ",
responseBody=" + response.getStatusLine().getReasonPhrase());
            return pushMessageResponseDTO;
        } catch (SocketTimeoutException e) {
            received_time_out_counter.incr();
            log.error("externalServiceOutputBolt, TimeoutException", e);

        } catch (Throwable t) {
            received_fail_status_counter.incr();
            pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
            if (t.getMessage() != null) {
                log.error("externalServiceOutputBolt, error executing
externalService API. errorMsg=" + t.getMessage(), t);
            }
        } finally {
            if (response != null) {
                response.close();
            }
        }
        return pushMessageResponseDTO;
    }

    private CloseableHttpClient getHttpClientInstance() {
        PoolingHttpClientConnectionManager cm = new
PoolingHttpClientConnectionManager();
        cm.setDefaultMaxPerRoute(100);
        cm.setMaxTotal(500);
        int timeout = 4;
        RequestConfig config = RequestConfig.custom()
                .setConnectTimeout(timeout * 1000) //in millis
                .setConnectionRequestTimeout(timeout * 1000)
                .setSocketTimeout(timeout * 1000).build();
        return HttpClientBuilder.create().
                setDefaultRequestConfig(config).
                setConnectionManager(cm).
                build();
    }
}

Thank you.

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
Hi,
I want to update you with the following:
Having those configurations:

config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS ,1000);
Config.setMaxSpoutPending(config,50);

config.put("TOPOLOGY_ACKER_EXECUTORS", 0);


and status from Storm-UI:

[image: תמונה מוטבעת 1]


I am still having failed tuples but* much lower* that it used to. Great
improvement.

 How can I decrease that number? It's not good for our SLA to have
reply. because than some services in our topology triggering twice. I
must have exactly once.


1. Do you think I should keep "playing" with the params? Is it common
to have around 140 failed tuples for 500,000 messages?

Suggestions?

Thanks.



2015-03-10 17:57 GMT+02:00 Nathan Leung <nc...@gmail.com>:

> Sometimes you get failed tuples in the beginning due to JVM warmup / other
> system initialization.  Wait to see if you continue to get failed tuples.
> If so try reducing max spout pending further (e.g. 50).
>
> On Tue, Mar 10, 2015 at 11:47 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> Well bad news. after increasing to 100 I still see the failing number
>> increasing. it's getting bigger and bigger
>>
>> [image: תמונה מוטבעת 1]
>>
>> 2015-03-10 17:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>>
>>> Gotcha. I gonna change it to 100. test with high throughput and watch
>>> the failing tuples. in case of improvements(or any other) ill notify here
>>> with status so others could benefit. thank you.
>>>
>>> 2015-03-10 17:26 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> I would keep the tuple timeout for now.  No need to change too many
>>>> parameters all at once.  If max spout pending of 100 can meet your
>>>> throughput goals and eliminate the failed tuples, then there is no need to
>>>> change the timeout too.
>>>
>>>
>>> On Tue, Mar 10, 2015 at 11:14 AM, Martin Illecker <mi...@apache.org>
>>> wrote:
>>>
>>>> I believe you can set the tuple timeout with *topology.message.timeout.secs
>>>> [1]*, which is by default 30 seconds.
>>>>
>>>> [1] https://github.com/apache/storm/blob/master/conf/defaults.yaml#L174
>>>>
>>>> 2015-03-10 16:08 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>>
>>>>> Ill start with 100.
>>>>> What about the tuple timeout param? I have no idea where(or how much)
>>>>> to set that one.
>>>>>
>>>>> 2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> try 100 and increase if necessary.
>>>>>
>>>>>
>>>>> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> @Martin which number would you suggest to start with?
>>>>>> what about increasing the time out? how you do that?
>>>>>> and is it possible to disable reply of tuples which caused by
>>>>>> timeouts? coz this is killing my machines here.
>>>>>>
>>>>>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>>>>>
>>>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>>>
>>>>>>>
>>>>>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>>>>>> pending.
>>>>>>> 5000 seems to be far too much for a high latency bolt.
>>>>>>>
>>>>>>>
>>>>>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>>>>>
>>>>>>>> I just attached another screenshot after 10 mins.
>>>>>>>>
>>>>>>>>  it's getting worser.
>>>>>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>>>>>> messages the metrics shows I have received 400,000+ messages.
>>>>>>>>
>>>>>>>>  than I need to kill the topology and reset the kafka's offset else
>>>>>>>> when I bring it up again messages will continue to be consumed out of no
>>>>>>>> where.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>>>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>>>>>> builder
>>>>>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>>>>>         .shuffleGrouping("push-notification-reader");
>>>>>>>> builder
>>>>>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>>>>>> builder
>>>>>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>>>>>         .shuffleGrouping("deduplicator");
>>>>>>>> builder
>>>>>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>>>>>> builder
>>>>>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>>>>>         .shuffleGrouping("push-to-ExternalService");
>>>>>>>> builder
>>>>>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>>>> builder
>>>>>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>>>> builder
>>>>>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>>>>>
>>>>>>>> return builder;
>>>>>>>>
>>>>>>>> Another screenshot:
>>>>>>>>
>>>>>>>> [image: תמונה מוטבעת 1]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> they're probably timed out.  you can try tweaking max spout
>>>>>>>>> pending (what is its value in your topology?) or increasing the tuple
>>>>>>>>> timeout value.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <idan.frid@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> I attached storm-ui screenshot which I took in peak-time. I
>>>>>>>>> replaced the bolts names with numbers in order to be authentic
>>>>>>>>>
>>>>>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>>>>>
>>>>>>>>> [image: תמונה מוטבעת 1]
>>>>>>>>>
>>>>>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <
>>>>>>>>> haralds@evilezh.net> wrote:
>>>>>>>>>
>>>>>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>>>> with
>>>>>>>>>>  basicOutputCollector.emit(tuple, new
>>>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>>>>             collector.ack(tuple);
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>>>>>
>>>>>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>>>>>
>>>>>>>>>>>> but if I use try and catch why would they timout in a loop?
>>>>>>>>>>>> once they timeout i am catching them logging them and thats it
>>>>>>>>>>>>
>>>>>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Do you have a large number of errored tuples in this topology?
>>>>>>>>>>>>> You might run into a situation where tuples timeout in a loop
>>>>>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>>>>>> webservice.
>>>>>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes
>>>>>>>>>>>>>> it takes longer)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Under High throughput *that bolt is getting stuck and
>>>>>>>>>>>>>> nothing get into there anymore.* and the worst thing I am
>>>>>>>>>>>>>> having a "reply" of the messages
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The only way to get thru this is to reset kafka's offset.
>>>>>>>>>>>>>> else the zookeeper still logging kafka's offset and messages are still
>>>>>>>>>>>>>> replying
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>>>>>> import java.util.List;
>>>>>>>>>>>>>> import java.util.Map;
>>>>>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>>>>>         try {
>>>>>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>>>>>         try {
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>>>>>             response.getEntity();
>>>>>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>>>>>             } else {
>>>>>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>         } finally {
>>>>>>>>>>>>>>             if (response != null) {
>>>>>>>>>>>>>>                 response.close();
>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>>>>>                 build();
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
Sometimes you get failed tuples in the beginning due to JVM warmup / other
system initialization.  Wait to see if you continue to get failed tuples.
If so try reducing max spout pending further (e.g. 50).

On Tue, Mar 10, 2015 at 11:47 AM, Idan Fridman <id...@gmail.com> wrote:

> Well bad news. after increasing to 100 I still see the failing number
> increasing. it's getting bigger and bigger
>
> [image: תמונה מוטבעת 1]
>
> 2015-03-10 17:29 GMT+02:00 Idan Fridman <id...@gmail.com>:
>
>> Gotcha. I gonna change it to 100. test with high throughput and watch the
>> failing tuples. in case of improvements(or any other) ill notify here with
>> status so others could benefit. thank you.
>>
>> 2015-03-10 17:26 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>
>>> I would keep the tuple timeout for now.  No need to change too many
>>> parameters all at once.  If max spout pending of 100 can meet your
>>> throughput goals and eliminate the failed tuples, then there is no need to
>>> change the timeout too.
>>
>>
>> On Tue, Mar 10, 2015 at 11:14 AM, Martin Illecker <mi...@apache.org>
>> wrote:
>>
>>> I believe you can set the tuple timeout with *topology.message.timeout.secs
>>> [1]*, which is by default 30 seconds.
>>>
>>> [1] https://github.com/apache/storm/blob/master/conf/defaults.yaml#L174
>>>
>>> 2015-03-10 16:08 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>
>>>> Ill start with 100.
>>>> What about the tuple timeout param? I have no idea where(or how much)
>>>> to set that one.
>>>>
>>>> 2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> try 100 and increase if necessary.
>>>>
>>>>
>>>> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
>>>> wrote:
>>>>
>>>>> @Martin which number would you suggest to start with?
>>>>> what about increasing the time out? how you do that?
>>>>> and is it possible to disable reply of tuples which caused by
>>>>> timeouts? coz this is killing my machines here.
>>>>>
>>>>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>>>>
>>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>>
>>>>>>
>>>>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>>>>> pending.
>>>>>> 5000 seems to be far too much for a high latency bolt.
>>>>>>
>>>>>>
>>>>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>>>>
>>>>>>> I just attached another screenshot after 10 mins.
>>>>>>>
>>>>>>>  it's getting worser.
>>>>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>>>>> messages the metrics shows I have received 400,000+ messages.
>>>>>>>
>>>>>>>  than I need to kill the topology and reset the kafka's offset else
>>>>>>> when I bring it up again messages will continue to be consumed out of no
>>>>>>> where.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>>>>> builder
>>>>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>>>>         .shuffleGrouping("push-notification-reader");
>>>>>>> builder
>>>>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>>>>> builder
>>>>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>>>>         .shuffleGrouping("deduplicator");
>>>>>>> builder
>>>>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>>>>> builder
>>>>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>>>>         .shuffleGrouping("push-to-ExternalService");
>>>>>>> builder
>>>>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>>> builder
>>>>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>>> builder
>>>>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>>>>
>>>>>>> return builder;
>>>>>>>
>>>>>>> Another screenshot:
>>>>>>>
>>>>>>> [image: תמונה מוטבעת 1]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I attached storm-ui screenshot which I took in peak-time. I
>>>>>>>> replaced the bolts names with numbers in order to be authentic
>>>>>>>>
>>>>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>>>>
>>>>>>>> [image: תמונה מוטבעת 1]
>>>>>>>>
>>>>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <
>>>>>>>> haralds@evilezh.net> wrote:
>>>>>>>>
>>>>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>>> with
>>>>>>>>>  basicOutputCollector.emit(tuple, new
>>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>>>             collector.ack(tuple);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>>>>
>>>>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>>>>
>>>>>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>>>>>
>>>>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Do you have a large number of errored tuples in this topology?
>>>>>>>>>>>> You might run into a situation where tuples timeout in a loop
>>>>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>>>>> webservice.
>>>>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes
>>>>>>>>>>>>> it takes longer)
>>>>>>>>>>>>>
>>>>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>>>>
>>>>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>>>>
>>>>>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing
>>>>>>>>>>>>> get into there anymore.* and the worst thing I am having a
>>>>>>>>>>>>> "reply" of the messages
>>>>>>>>>>>>>
>>>>>>>>>>>>> The only way to get thru this is to reset kafka's offset. else
>>>>>>>>>>>>> the zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>>>>
>>>>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>>>>
>>>>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>>>>
>>>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>>>>> import java.util.List;
>>>>>>>>>>>>> import java.util.Map;
>>>>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>>>>
>>>>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>>>>         try {
>>>>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>>>>         }
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>>>>         try {
>>>>>>>>>>>>>
>>>>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>>>>             response.getEntity();
>>>>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>>>>             } else {
>>>>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>>>>
>>>>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>         } finally {
>>>>>>>>>>>>>             if (response != null) {
>>>>>>>>>>>>>                 response.close();
>>>>>>>>>>>>>             }
>>>>>>>>>>>>>         }
>>>>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>>>>                 build();
>>>>>>>>>>>>>     }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thank you.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
Well bad news. after increasing to 100 I still see the failing number
increasing. it's getting bigger and bigger

[image: תמונה מוטבעת 1]

2015-03-10 17:29 GMT+02:00 Idan Fridman <id...@gmail.com>:

> Gotcha. I gonna change it to 100. test with high throughput and watch the
> failing tuples. in case of improvements(or any other) ill notify here with
> status so others could benefit. thank you.
>
> 2015-03-10 17:26 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>
>> I would keep the tuple timeout for now.  No need to change too many
>> parameters all at once.  If max spout pending of 100 can meet your
>> throughput goals and eliminate the failed tuples, then there is no need to
>> change the timeout too.
>
>
> On Tue, Mar 10, 2015 at 11:14 AM, Martin Illecker <mi...@apache.org>
> wrote:
>
>> I believe you can set the tuple timeout with *topology.message.timeout.secs
>> [1]*, which is by default 30 seconds.
>>
>> [1] https://github.com/apache/storm/blob/master/conf/defaults.yaml#L174
>>
>> 2015-03-10 16:08 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>
>>> Ill start with 100.
>>> What about the tuple timeout param? I have no idea where(or how much) to
>>> set that one.
>>>
>>> 2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> try 100 and increase if necessary.
>>>
>>>
>>> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> @Martin which number would you suggest to start with?
>>>> what about increasing the time out? how you do that?
>>>> and is it possible to disable reply of tuples which caused by timeouts?
>>>> coz this is killing my machines here.
>>>>
>>>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>>>
>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>
>>>>>
>>>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>>>> pending.
>>>>> 5000 seems to be far too much for a high latency bolt.
>>>>>
>>>>>
>>>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>>>
>>>>>> I just attached another screenshot after 10 mins.
>>>>>>
>>>>>>  it's getting worser.
>>>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>>>> messages the metrics shows I have received 400,000+ messages.
>>>>>>
>>>>>>  than I need to kill the topology and reset the kafka's offset else
>>>>>> when I bring it up again messages will continue to be consumed out of no
>>>>>> where.
>>>>>>
>>>>>>
>>>>>>
>>>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>>>> builder
>>>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>>>         .shuffleGrouping("push-notification-reader");
>>>>>> builder
>>>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>>>> builder
>>>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>>>         .shuffleGrouping("deduplicator");
>>>>>> builder
>>>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>>>> builder
>>>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>>>         .shuffleGrouping("push-to-ExternalService");
>>>>>> builder
>>>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>> builder
>>>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>> builder
>>>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>>>
>>>>>> return builder;
>>>>>>
>>>>>> Another screenshot:
>>>>>>
>>>>>> [image: תמונה מוטבעת 1]
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I attached storm-ui screenshot which I took in peak-time. I replaced
>>>>>>> the bolts names with numbers in order to be authentic
>>>>>>>
>>>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>>>
>>>>>>> [image: תמונה מוטבעת 1]
>>>>>>>
>>>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <
>>>>>>> haralds@evilezh.net> wrote:
>>>>>>>
>>>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>> with
>>>>>>>>  basicOutputCollector.emit(tuple, new
>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>>             collector.ack(tuple);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>>>
>>>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>>>
>>>>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>>>>
>>>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Do you have a large number of errored tuples in this topology?
>>>>>>>>>>> You might run into a situation where tuples timeout in a loop
>>>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>>>> webservice.
>>>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>>>>>> takes longer)
>>>>>>>>>>>>
>>>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>>>
>>>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>>>
>>>>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing
>>>>>>>>>>>> get into there anymore.* and the worst thing I am having a
>>>>>>>>>>>> "reply" of the messages
>>>>>>>>>>>>
>>>>>>>>>>>> The only way to get thru this is to reset kafka's offset. else
>>>>>>>>>>>> the zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>>>
>>>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>>>
>>>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>>>
>>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>>>> import java.util.List;
>>>>>>>>>>>> import java.util.Map;
>>>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>>>
>>>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>>>         try {
>>>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>>>             }
>>>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>>>         }
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>>>         try {
>>>>>>>>>>>>
>>>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>>>             response.getEntity();
>>>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>>>             } else {
>>>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>>>             }
>>>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>>>
>>>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>>>             }
>>>>>>>>>>>>         } finally {
>>>>>>>>>>>>             if (response != null) {
>>>>>>>>>>>>                 response.close();
>>>>>>>>>>>>             }
>>>>>>>>>>>>         }
>>>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>>>                 build();
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
Gotcha. I gonna change it to 100. test with high throughput and watch the
failing tuples. in case of improvements(or any other) ill notify here with
status so others could benefit. thank you.

2015-03-10 17:26 GMT+02:00 Nathan Leung <nc...@gmail.com>:

> I would keep the tuple timeout for now.  No need to change too many
> parameters all at once.  If max spout pending of 100 can meet your
> throughput goals and eliminate the failed tuples, then there is no need to
> change the timeout too.
>
> On Tue, Mar 10, 2015 at 11:14 AM, Martin Illecker <mi...@apache.org>
> wrote:
>
>> I believe you can set the tuple timeout with *topology.message.timeout.secs
>> [1]*, which is by default 30 seconds.
>>
>> [1] https://github.com/apache/storm/blob/master/conf/defaults.yaml#L174
>>
>> 2015-03-10 16:08 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>
>>> Ill start with 100.
>>> What about the tuple timeout param? I have no idea where(or how much) to
>>> set that one.
>>>
>>> 2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> try 100 and increase if necessary.
>>>
>>>
>>> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> @Martin which number would you suggest to start with?
>>>> what about increasing the time out? how you do that?
>>>> and is it possible to disable reply of tuples which caused by timeouts?
>>>> coz this is killing my machines here.
>>>>
>>>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>>>
>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>
>>>>>
>>>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>>>> pending.
>>>>> 5000 seems to be far too much for a high latency bolt.
>>>>>
>>>>>
>>>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>>>
>>>>>> I just attached another screenshot after 10 mins.
>>>>>>
>>>>>>  it's getting worser.
>>>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>>>> messages the metrics shows I have received 400,000+ messages.
>>>>>>
>>>>>>  than I need to kill the topology and reset the kafka's offset else
>>>>>> when I bring it up again messages will continue to be consumed out of no
>>>>>> where.
>>>>>>
>>>>>>
>>>>>>
>>>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>>>> builder
>>>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>>>         .shuffleGrouping("push-notification-reader");
>>>>>> builder
>>>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>>>> builder
>>>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>>>         .shuffleGrouping("deduplicator");
>>>>>> builder
>>>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>>>> builder
>>>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>>>         .shuffleGrouping("push-to-ExternalService");
>>>>>> builder
>>>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>> builder
>>>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>>>         .shuffleGrouping("status-aggregator");
>>>>>> builder
>>>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>>>
>>>>>> return builder;
>>>>>>
>>>>>> Another screenshot:
>>>>>>
>>>>>> [image: תמונה מוטבעת 1]
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I attached storm-ui screenshot which I took in peak-time. I replaced
>>>>>>> the bolts names with numbers in order to be authentic
>>>>>>>
>>>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>>>
>>>>>>> [image: תמונה מוטבעת 1]
>>>>>>>
>>>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <
>>>>>>> haralds@evilezh.net> wrote:
>>>>>>>
>>>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>> with
>>>>>>>>  basicOutputCollector.emit(tuple, new
>>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>>             collector.ack(tuple);
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>>>
>>>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>>>
>>>>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>>>>
>>>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>>> Do you have a large number of errored tuples in this topology?
>>>>>>>>>>> You might run into a situation where tuples timeout in a loop
>>>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>>>> webservice.
>>>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>>>>>> takes longer)
>>>>>>>>>>>>
>>>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>>>
>>>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>>>
>>>>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing
>>>>>>>>>>>> get into there anymore.* and the worst thing I am having a
>>>>>>>>>>>> "reply" of the messages
>>>>>>>>>>>>
>>>>>>>>>>>> The only way to get thru this is to reset kafka's offset. else
>>>>>>>>>>>> the zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>>>
>>>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>>>
>>>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>>>
>>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>>>> import java.util.List;
>>>>>>>>>>>> import java.util.Map;
>>>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>>>
>>>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>>>         try {
>>>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>>>             }
>>>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>>>         }
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>>>         try {
>>>>>>>>>>>>
>>>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>>>             response.getEntity();
>>>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>>>             } else {
>>>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>>>             }
>>>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>>>
>>>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>>>             }
>>>>>>>>>>>>         } finally {
>>>>>>>>>>>>             if (response != null) {
>>>>>>>>>>>>                 response.close();
>>>>>>>>>>>>             }
>>>>>>>>>>>>         }
>>>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>>>                 build();
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
I would keep the tuple timeout for now.  No need to change too many
parameters all at once.  If max spout pending of 100 can meet your
throughput goals and eliminate the failed tuples, then there is no need to
change the timeout too.

On Tue, Mar 10, 2015 at 11:14 AM, Martin Illecker <mi...@apache.org>
wrote:

> I believe you can set the tuple timeout with *topology.message.timeout.secs
> [1]*, which is by default 30 seconds.
>
> [1] https://github.com/apache/storm/blob/master/conf/defaults.yaml#L174
>
> 2015-03-10 16:08 GMT+01:00 Idan Fridman <id...@gmail.com>:
>
>> Ill start with 100.
>> What about the tuple timeout param? I have no idea where(or how much) to
>> set that one.
>>
>> 2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>
>>> try 100 and increase if necessary.
>>
>>
>> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
>> wrote:
>>
>>> @Martin which number would you suggest to start with?
>>> what about increasing the time out? how you do that?
>>> and is it possible to disable reply of tuples which caused by timeouts?
>>> coz this is killing my machines here.
>>>
>>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>>
>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>
>>>>
>>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>>> pending.
>>>> 5000 seems to be far too much for a high latency bolt.
>>>>
>>>>
>>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>>
>>>>> I just attached another screenshot after 10 mins.
>>>>>
>>>>>  it's getting worser.
>>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>>> messages the metrics shows I have received 400,000+ messages.
>>>>>
>>>>>  than I need to kill the topology and reset the kafka's offset else
>>>>> when I bring it up again messages will continue to be consumed out of no
>>>>> where.
>>>>>
>>>>>
>>>>>
>>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>>> builder
>>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>>         .shuffleGrouping("push-notification-reader");
>>>>> builder
>>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>>> builder
>>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>>         .shuffleGrouping("deduplicator");
>>>>> builder
>>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>>> builder
>>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>>         .shuffleGrouping("push-to-ExternalService");
>>>>> builder
>>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>>         .shuffleGrouping("status-aggregator");
>>>>> builder
>>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>>         .shuffleGrouping("status-aggregator");
>>>>> builder
>>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>>
>>>>> return builder;
>>>>>
>>>>> Another screenshot:
>>>>>
>>>>> [image: תמונה מוטבעת 1]
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>>
>>>>>
>>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I attached storm-ui screenshot which I took in peak-time. I replaced
>>>>>> the bolts names with numbers in order to be authentic
>>>>>>
>>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>>
>>>>>> [image: תמונה מוטבעת 1]
>>>>>>
>>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <haralds@evilezh.net
>>>>>> > wrote:
>>>>>>
>>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>>> Values(pushMessageResponseDTO));
>>>>>>> with
>>>>>>>  basicOutputCollector.emit(tuple, new
>>>>>>> Values(pushMessageResponseDTO));
>>>>>>>             collector.ack(tuple);
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>>
>>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>>
>>>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>>>
>>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> Do you have a large number of errored tuples in this topology?
>>>>>>>>>> You might run into a situation where tuples timeout in a loop
>>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>>> webservice.
>>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>>>>> takes longer)
>>>>>>>>>>>
>>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>>
>>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>>
>>>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing
>>>>>>>>>>> get into there anymore.* and the worst thing I am having a
>>>>>>>>>>> "reply" of the messages
>>>>>>>>>>>
>>>>>>>>>>> The only way to get thru this is to reset kafka's offset. else
>>>>>>>>>>> the zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>>
>>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>>
>>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>>
>>>>>>>>>>> import java.io.IOException;
>>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>>> import java.util.List;
>>>>>>>>>>> import java.util.Map;
>>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>>
>>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>     @Override
>>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>>         try {
>>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>>             }
>>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>>         }
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>>         try {
>>>>>>>>>>>
>>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>>             response.getEntity();
>>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>>             } else {
>>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>>             }
>>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>>
>>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>>             }
>>>>>>>>>>>         } finally {
>>>>>>>>>>>             if (response != null) {
>>>>>>>>>>>                 response.close();
>>>>>>>>>>>             }
>>>>>>>>>>>         }
>>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>>                 build();
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Thank you.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Martin Illecker <mi...@apache.org>.
I believe you can set the tuple timeout with *topology.message.timeout.secs
[1]*, which is by default 30 seconds.

[1] https://github.com/apache/storm/blob/master/conf/defaults.yaml#L174

2015-03-10 16:08 GMT+01:00 Idan Fridman <id...@gmail.com>:

> Ill start with 100.
> What about the tuple timeout param? I have no idea where(or how much) to
> set that one.
>
> 2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>
>> try 100 and increase if necessary.
>
>
> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> @Martin which number would you suggest to start with?
>> what about increasing the time out? how you do that?
>> and is it possible to disable reply of tuples which caused by timeouts?
>> coz this is killing my machines here.
>>
>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>
>>> they're probably timed out.  you can try tweaking max spout pending
>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>
>>>
>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>> pending.
>>> 5000 seems to be far too much for a high latency bolt.
>>>
>>>
>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>
>>>> I just attached another screenshot after 10 mins.
>>>>
>>>>  it's getting worser.
>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>> messages the metrics shows I have received 400,000+ messages.
>>>>
>>>>  than I need to kill the topology and reset the kafka's offset else
>>>> when I bring it up again messages will continue to be consumed out of no
>>>> where.
>>>>
>>>>
>>>>
>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>> builder
>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>         .shuffleGrouping("push-notification-reader");
>>>> builder
>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>> builder
>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>         .shuffleGrouping("deduplicator");
>>>> builder
>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>> builder
>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>         .shuffleGrouping("push-to-ExternalService");
>>>> builder
>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>         .shuffleGrouping("status-aggregator");
>>>> builder
>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>         .shuffleGrouping("status-aggregator");
>>>> builder
>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>
>>>> return builder;
>>>>
>>>> Another screenshot:
>>>>
>>>> [image: תמונה מוטבעת 1]
>>>>
>>>>
>>>>
>>>>
>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>
>>>>
>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>>> wrote:
>>>>
>>>>> I attached storm-ui screenshot which I took in peak-time. I replaced
>>>>> the bolts names with numbers in order to be authentic
>>>>>
>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>
>>>>> [image: תמונה מוטבעת 1]
>>>>>
>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>
>>>>>
>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
>>>>> wrote:
>>>>>
>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>> Values(pushMessageResponseDTO));
>>>>>> with
>>>>>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>>>>>             collector.ack(tuple);
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>> wrote:
>>>>>>
>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>
>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>>>>>
>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>
>>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>>
>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> Do you have a large number of errored tuples in this topology? You
>>>>>>>>> might run into a situation where tuples timeout in a loop
>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>> webservice.
>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>>>> takes longer)
>>>>>>>>>>
>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>
>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>
>>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing
>>>>>>>>>> get into there anymore.* and the worst thing I am having a
>>>>>>>>>> "reply" of the messages
>>>>>>>>>>
>>>>>>>>>> The only way to get thru this is to reset kafka's offset. else
>>>>>>>>>> the zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>
>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>
>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>
>>>>>>>>>> import java.io.IOException;
>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>> import java.util.List;
>>>>>>>>>> import java.util.Map;
>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>
>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>         try {
>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>             }
>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>         }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>         try {
>>>>>>>>>>
>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>             response.getEntity();
>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>             } else {
>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>             }
>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>
>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>             }
>>>>>>>>>>         } finally {
>>>>>>>>>>             if (response != null) {
>>>>>>>>>>                 response.close();
>>>>>>>>>>             }
>>>>>>>>>>         }
>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>                 build();
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Thank you.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
Ill start with 100.
What about the tuple timeout param? I have no idea where(or how much) to
set that one.

2015-03-10 17:05 GMT+02:00 Nathan Leung <nc...@gmail.com>:

> try 100 and increase if necessary.
>
> On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> @Martin which number would you suggest to start with?
>> what about increasing the time out? how you do that?
>> and is it possible to disable reply of tuples which caused by timeouts?
>> coz this is killing my machines here.
>>
>> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>>
>>> they're probably timed out.  you can try tweaking max spout pending
>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>
>>>
>>> As Nathan mentioned before, I would suggest decreasing the max spout
>>> pending.
>>> 5000 seems to be far too much for a high latency bolt.
>>>
>>>
>>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>>
>>>> I just attached another screenshot after 10 mins.
>>>>
>>>>  it's getting worser.
>>>> messages are being received in a loop. If I sent via kafka 200,000
>>>> messages the metrics shows I have received 400,000+ messages.
>>>>
>>>>  than I need to kill the topology and reset the kafka's offset else
>>>> when I bring it up again messages will continue to be consumed out of no
>>>> where.
>>>>
>>>>
>>>>
>>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>>> builder
>>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>>         .shuffleGrouping("push-notification-reader");
>>>> builder
>>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>>> builder
>>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>>         .shuffleGrouping("deduplicator");
>>>> builder
>>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>>> builder
>>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>>         .shuffleGrouping("push-to-ExternalService");
>>>> builder
>>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>>         .shuffleGrouping("status-aggregator");
>>>> builder
>>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>>         .shuffleGrouping("status-aggregator");
>>>> builder
>>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>>         .shuffleGrouping("insightsExtractorBolt");
>>>>
>>>> return builder;
>>>>
>>>> Another screenshot:
>>>>
>>>> [image: תמונה מוטבעת 1]
>>>>
>>>>
>>>>
>>>>
>>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> they're probably timed out.  you can try tweaking max spout pending
>>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>>
>>>>
>>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>>> wrote:
>>>>
>>>>> I attached storm-ui screenshot which I took in peak-time. I replaced
>>>>> the bolts names with numbers in order to be authentic
>>>>>
>>>>> I can see some failed bolts within the spout. Any idea?
>>>>>
>>>>> [image: תמונה מוטבעת 1]
>>>>>
>>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> You don't need to ack when extending BaseBasicBolt
>>>>>
>>>>>
>>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
>>>>> wrote:
>>>>>
>>>>>> try to replace: basicOutputCollector.emit(new
>>>>>> Values(pushMessageResponseDTO));
>>>>>> with
>>>>>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>>>>>             collector.ack(tuple);
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>>> wrote:
>>>>>>
>>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>>
>>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>>>>>
>>>>>>>> By the metrics I can see some errors yes.
>>>>>>>>
>>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>>
>>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>>
>>>>>>>>> Do you have a large number of errored tuples in this topology? You
>>>>>>>>> might run into a situation where tuples timeout in a loop
>>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>>> webservice.
>>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>>>> takes longer)
>>>>>>>>>>
>>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>>
>>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>>
>>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing
>>>>>>>>>> get into there anymore.* and the worst thing I am having a
>>>>>>>>>> "reply" of the messages
>>>>>>>>>>
>>>>>>>>>> The only way to get thru this is to reset kafka's offset. else
>>>>>>>>>> the zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>>
>>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>>
>>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>>
>>>>>>>>>> import java.io.IOException;
>>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>>> import java.util.ArrayList;
>>>>>>>>>> import java.util.List;
>>>>>>>>>> import java.util.Map;
>>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>>
>>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>>         initMetrics(context);
>>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>>         try {
>>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>>             }
>>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>>         }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>>         try {
>>>>>>>>>>
>>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>>             response.getEntity();
>>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>>             } else {
>>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>>             }
>>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>>
>>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>>             }
>>>>>>>>>>         } finally {
>>>>>>>>>>             if (response != null) {
>>>>>>>>>>                 response.close();
>>>>>>>>>>             }
>>>>>>>>>>         }
>>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>>         int timeout = 4;
>>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>>                 build();
>>>>>>>>>>     }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Thank you.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
try 100 and increase if necessary.

On Tue, Mar 10, 2015 at 11:02 AM, Idan Fridman <id...@gmail.com> wrote:

> @Martin which number would you suggest to start with?
> what about increasing the time out? how you do that?
> and is it possible to disable reply of tuples which caused by timeouts?
> coz this is killing my machines here.
>
> 2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:
>
>> they're probably timed out.  you can try tweaking max spout pending (what
>>> is its value in your topology?) or increasing the tuple timeout value.
>>
>>
>> As Nathan mentioned before, I would suggest decreasing the max spout
>> pending.
>> 5000 seems to be far too much for a high latency bolt.
>>
>>
>> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>>
>>> I just attached another screenshot after 10 mins.
>>>
>>>  it's getting worser.
>>> messages are being received in a loop. If I sent via kafka 200,000
>>> messages the metrics shows I have received 400,000+ messages.
>>>
>>>  than I need to kill the topology and reset the kafka's offset else when
>>> I bring it up again messages will continue to be consumed out of no where.
>>>
>>>
>>>
>>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>>> builder
>>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>>         .shuffleGrouping("push-notification-reader");
>>> builder
>>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>>> builder
>>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>>         .shuffleGrouping("deduplicator");
>>> builder
>>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>>> builder
>>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>>         .shuffleGrouping("push-to-ExternalService");
>>> builder
>>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>>         .shuffleGrouping("status-aggregator");
>>> builder
>>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>>         .shuffleGrouping("status-aggregator");
>>> builder
>>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>>         .shuffleGrouping("insightsExtractorBolt");
>>>
>>> return builder;
>>>
>>> Another screenshot:
>>>
>>> [image: תמונה מוטבעת 1]
>>>
>>>
>>>
>>>
>>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> they're probably timed out.  you can try tweaking max spout pending
>>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>>
>>>
>>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>>> wrote:
>>>
>>>> I attached storm-ui screenshot which I took in peak-time. I replaced
>>>> the bolts names with numbers in order to be authentic
>>>>
>>>> I can see some failed bolts within the spout. Any idea?
>>>>
>>>> [image: תמונה מוטבעת 1]
>>>>
>>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> You don't need to ack when extending BaseBasicBolt
>>>>
>>>>
>>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
>>>> wrote:
>>>>
>>>>> try to replace: basicOutputCollector.emit(new
>>>>> Values(pushMessageResponseDTO));
>>>>> with
>>>>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>>>>             collector.ack(tuple);
>>>>>
>>>>>
>>>>>
>>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net>
>>>>> wrote:
>>>>>
>>>>>> And where do you ACK/FAIL tuples ?
>>>>>>
>>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>>>>
>>>>>>> By the metrics I can see some errors yes.
>>>>>>>
>>>>>>> but if I use try and catch why would they timout in a loop? once
>>>>>>> they timeout i am catching them logging them and thats it
>>>>>>>
>>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>>
>>>>>>>> Do you have a large number of errored tuples in this topology? You
>>>>>>>> might run into a situation where tuples timeout in a loop
>>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>>> webservice.
>>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>>> takes longer)
>>>>>>>>>
>>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>>
>>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>>
>>>>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>>>>> the messages
>>>>>>>>>
>>>>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>>
>>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>>
>>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>>> import org.slf4j.Logger;
>>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>>
>>>>>>>>> import java.io.IOException;
>>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>>> import java.util.ArrayList;
>>>>>>>>> import java.util.List;
>>>>>>>>> import java.util.Map;
>>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>>
>>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>>         initMetrics(context);
>>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>>         try {
>>>>>>>>>             received_message_counter.incr();
>>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>>             }
>>>>>>>>>         } catch (Exception e) {
>>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>>         }
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>>         try {
>>>>>>>>>
>>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>>             response.getEntity();
>>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>>             } else {
>>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>>             }
>>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>>
>>>>>>>>>         } catch (Throwable t) {
>>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>>             }
>>>>>>>>>         } finally {
>>>>>>>>>             if (response != null) {
>>>>>>>>>                 response.close();
>>>>>>>>>             }
>>>>>>>>>         }
>>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>>         int timeout = 4;
>>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>>                 build();
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Thank you.
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
@Martin which number would you suggest to start with?
what about increasing the time out? how you do that?
and is it possible to disable reply of tuples which caused by timeouts? coz
this is killing my machines here.

2015-03-10 17:00 GMT+02:00 Martin Illecker <mi...@apache.org>:

> they're probably timed out.  you can try tweaking max spout pending (what
>> is its value in your topology?) or increasing the tuple timeout value.
>
>
> As Nathan mentioned before, I would suggest decreasing the max spout
> pending.
> 5000 seems to be far too much for a high latency bolt.
>
>
> 2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:
>
>> I just attached another screenshot after 10 mins.
>>
>>  it's getting worser.
>> messages are being received in a loop. If I sent via kafka 200,000
>> messages the metrics shows I have received 400,000+ messages.
>>
>>  than I need to kill the topology and reset the kafka's offset else when
>> I bring it up again messages will continue to be consumed out of no where.
>>
>>
>>
>> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
>> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
>> builder
>>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>>         .shuffleGrouping("push-notification-reader");
>> builder
>>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>>         .fieldsGrouping("push-message-parser", new Fields("snid"));
>> builder
>>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>>         .shuffleGrouping("deduplicator");
>> builder
>>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
>> builder
>>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>>         .shuffleGrouping("push-to-ExternalService");
>> builder
>>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>>         .shuffleGrouping("status-aggregator");
>> builder
>>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>>         .shuffleGrouping("status-aggregator");
>> builder
>>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>>         .shuffleGrouping("insightsExtractorBolt");
>>
>> return builder;
>>
>> Another screenshot:
>>
>> [image: תמונה מוטבעת 1]
>>
>>
>>
>>
>> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>
>>> they're probably timed out.  you can try tweaking max spout pending
>>> (what is its value in your topology?) or increasing the tuple timeout value.
>>
>>
>> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
>> wrote:
>>
>>> I attached storm-ui screenshot which I took in peak-time. I replaced the
>>> bolts names with numbers in order to be authentic
>>>
>>> I can see some failed bolts within the spout. Any idea?
>>>
>>> [image: תמונה מוטבעת 1]
>>>
>>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> You don't need to ack when extending BaseBasicBolt
>>>
>>>
>>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
>>> wrote:
>>>
>>>> try to replace: basicOutputCollector.emit(new
>>>> Values(pushMessageResponseDTO));
>>>> with
>>>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>>>             collector.ack(tuple);
>>>>
>>>>
>>>>
>>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:
>>>>
>>>>> And where do you ACK/FAIL tuples ?
>>>>>
>>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>>>
>>>>>> By the metrics I can see some errors yes.
>>>>>>
>>>>>> but if I use try and catch why would they timout in a loop? once they
>>>>>> timeout i am catching them logging them and thats it
>>>>>>
>>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>>
>>>>>>> Do you have a large number of errored tuples in this topology? You
>>>>>>> might run into a situation where tuples timeout in a loop
>>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>>>>>
>>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>>> webservice.
>>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>>> takes longer)
>>>>>>>>
>>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>>
>>>>>>>> When I send messages one by one everything working fine but
>>>>>>>>
>>>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>>>> the messages
>>>>>>>>
>>>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>>>
>>>>>>>>
>>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>>
>>>>>>>> package com.mycompany.push.topology;
>>>>>>>>
>>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>>> import backtype.storm.tuple.Values;
>>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>>> import org.apache.http.NameValuePair;
>>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>>> import org.slf4j.Logger;
>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>>
>>>>>>>> import java.io.IOException;
>>>>>>>> import java.net.SocketTimeoutException;
>>>>>>>> import java.util.ArrayList;
>>>>>>>> import java.util.List;
>>>>>>>> import java.util.Map;
>>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>>
>>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>>
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>>         initMetrics(context);
>>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>>     @Override
>>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>>         try {
>>>>>>>>             received_message_counter.incr();
>>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>>             }
>>>>>>>>         } catch (Exception e) {
>>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>>         }
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>>         try {
>>>>>>>>
>>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>>             response = httpClient.execute(post);
>>>>>>>>             response.getEntity();
>>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>>             } else {
>>>>>>>>                 received_status_200_counter.incr();
>>>>>>>>             }
>>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>>             return pushMessageResponseDTO;
>>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>>             received_time_out_counter.incr();
>>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>>
>>>>>>>>         } catch (Throwable t) {
>>>>>>>>             received_fail_status_counter.incr();
>>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>>             if (t.getMessage() != null) {
>>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>>             }
>>>>>>>>         } finally {
>>>>>>>>             if (response != null) {
>>>>>>>>                 response.close();
>>>>>>>>             }
>>>>>>>>         }
>>>>>>>>         return pushMessageResponseDTO;
>>>>>>>>     }
>>>>>>>>
>>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>>         cm.setMaxTotal(500);
>>>>>>>>         int timeout = 4;
>>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>>         return HttpClientBuilder.create().
>>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>>                 setConnectionManager(cm).
>>>>>>>>                 build();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Thank you.
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Martin Illecker <mi...@apache.org>.
>
> they're probably timed out.  you can try tweaking max spout pending (what
> is its value in your topology?) or increasing the tuple timeout value.


As Nathan mentioned before, I would suggest decreasing the max spout
pending.
5000 seems to be far too much for a high latency bolt.


2015-03-10 15:43 GMT+01:00 Idan Fridman <id...@gmail.com>:

> I just attached another screenshot after 10 mins.
>
>  it's getting worser.
> messages are being received in a loop. If I sent via kafka 200,000
> messages the metrics shows I have received 400,000+ messages.
>
>  than I need to kill the topology and reset the kafka's offset else when I
> bring it up again messages will continue to be consumed out of no where.
>
>
>
> TopologyBuilder builder = new TopologyBuilder();                              //maximum number of tuples that can be unacked in at any given time
> builder.setSpout("push-notification-reader", new KafkaSpout(initKafkaSpoutConfig()), 4).setNumTasks(8).setMaxSpoutPending(5000);
> builder
>         .setBolt("push-message-parser", new MessageExtractorBolt(), 16).setNumTasks(32)
>         .shuffleGrouping("push-notification-reader");
> builder
>         .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
>         .fieldsGrouping("push-message-parser", new Fields("snid"));
> builder
>         .setBolt("push-to-ExternalService", new ExternalServiceOutputBolt(), 32).setNumTasks(64)
>         .shuffleGrouping("deduplicator");
> builder
>         .setBolt("status-aggregator", new StatusAggregatorBolt(), 1).setNumTasks(1)
>         .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
> builder
>         .setBolt("kafkaoutput", new MessageTrackerBolt(), 16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
>         .shuffleGrouping("push-to-ExternalService");
> builder
>         .setBolt("couchbase-service", new CouchbaseOutputBolt(), 1).setNumTasks(1)
>         .shuffleGrouping("status-aggregator");
> builder
>         .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(), 8).setNumTasks(8)
>         .shuffleGrouping("status-aggregator");
> builder
>         .setBolt("insightsOutPutBolt", new InsightsOutputBolt(), 8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
>         .shuffleGrouping("insightsExtractorBolt");
>
> return builder;
>
> Another screenshot:
>
> [image: תמונה מוטבעת 1]
>
>
>
>
> 2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>
>> they're probably timed out.  you can try tweaking max spout pending (what
>> is its value in your topology?) or increasing the tuple timeout value.
>
>
> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> I attached storm-ui screenshot which I took in peak-time. I replaced the
>> bolts names with numbers in order to be authentic
>>
>> I can see some failed bolts within the spout. Any idea?
>>
>> [image: תמונה מוטבעת 1]
>>
>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>
>>> You don't need to ack when extending BaseBasicBolt
>>
>>
>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
>> wrote:
>>
>>> try to replace: basicOutputCollector.emit(new
>>> Values(pushMessageResponseDTO));
>>> with
>>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>>             collector.ack(tuple);
>>>
>>>
>>>
>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:
>>>
>>>> And where do you ACK/FAIL tuples ?
>>>>
>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>>
>>>>> By the metrics I can see some errors yes.
>>>>>
>>>>> but if I use try and catch why would they timout in a loop? once they
>>>>> timeout i am catching them logging them and thats it
>>>>>
>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> Do you have a large number of errored tuples in this topology? You
>>>>>> might run into a situation where tuples timeout in a loop
>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>>>>
>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>> webservice.
>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>> takes longer)
>>>>>>>
>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>
>>>>>>> When I send messages one by one everything working fine but
>>>>>>>
>>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>>> the messages
>>>>>>>
>>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>>
>>>>>>>
>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>
>>>>>>> package com.mycompany.push.topology;
>>>>>>>
>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>> import backtype.storm.tuple.Values;
>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>> import org.apache.http.NameValuePair;
>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>> import org.slf4j.Logger;
>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.SocketTimeoutException;
>>>>>>> import java.util.ArrayList;
>>>>>>> import java.util.List;
>>>>>>> import java.util.Map;
>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>
>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>         initMetrics(context);
>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>         try {
>>>>>>>             received_message_counter.incr();
>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>             }
>>>>>>>         } catch (Exception e) {
>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>         try {
>>>>>>>
>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>             response = httpClient.execute(post);
>>>>>>>             response.getEntity();
>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>             } else {
>>>>>>>                 received_status_200_counter.incr();
>>>>>>>             }
>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>             return pushMessageResponseDTO;
>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>             received_time_out_counter.incr();
>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>
>>>>>>>         } catch (Throwable t) {
>>>>>>>             received_fail_status_counter.incr();
>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>             if (t.getMessage() != null) {
>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>             }
>>>>>>>         } finally {
>>>>>>>             if (response != null) {
>>>>>>>                 response.close();
>>>>>>>             }
>>>>>>>         }
>>>>>>>         return pushMessageResponseDTO;
>>>>>>>     }
>>>>>>>
>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>         cm.setMaxTotal(500);
>>>>>>>         int timeout = 4;
>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>         return HttpClientBuilder.create().
>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>                 setConnectionManager(cm).
>>>>>>>                 build();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
I just attached another screenshot after 10 mins.

 it's getting worser.
messages are being received in a loop. If I sent via kafka 200,000 messages
the metrics shows I have received 400,000+ messages.

 than I need to kill the topology and reset the kafka's offset else when I
bring it up again messages will continue to be consumed out of no where.



TopologyBuilder builder = new TopologyBuilder();
       //maximum number of tuples that can be unacked in at any given
time
builder.setSpout("push-notification-reader", new
KafkaSpout(initKafkaSpoutConfig()),
4).setNumTasks(8).setMaxSpoutPending(5000);
builder
        .setBolt("push-message-parser", new MessageExtractorBolt(),
16).setNumTasks(32)
        .shuffleGrouping("push-notification-reader");
builder
        .setBolt("deduplicator", new DeduplicatorBolt(), 16).setNumTasks(32)
        .fieldsGrouping("push-message-parser", new Fields("snid"));
builder
        .setBolt("push-to-ExternalService", new
ExternalServiceOutputBolt(), 32).setNumTasks(64)
        .shuffleGrouping("deduplicator");
builder
        .setBolt("status-aggregator", new StatusAggregatorBolt(),
1).setNumTasks(1)
        .shuffleGrouping("push-to-ExternalService").addConfigurations(statusConfig);
builder
        .setBolt("kafkaoutput", new MessageTrackerBolt(),
16).setNumTasks(64).addConfigurations(getKafkaBoltConfig())
        .shuffleGrouping("push-to-ExternalService");
builder
        .setBolt("couchbase-service", new CouchbaseOutputBolt(),
1).setNumTasks(1)
        .shuffleGrouping("status-aggregator");
builder
        .setBolt("insightsExtractorBolt", new InsightsExtractorBolt(),
8).setNumTasks(8)
        .shuffleGrouping("status-aggregator");
builder
        .setBolt("insightsOutPutBolt", new InsightsOutputBolt(),
8).setNumTasks(16).addConfigurations(getKafkaBoltConfig())
        .shuffleGrouping("insightsExtractorBolt");

return builder;

Another screenshot:

[image: תמונה מוטבעת 1]




2015-03-10 16:33 GMT+02:00 Nathan Leung <nc...@gmail.com>:

> they're probably timed out.  you can try tweaking max spout pending (what
> is its value in your topology?) or increasing the tuple timeout value.
>
> On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com>
> wrote:
>
>> I attached storm-ui screenshot which I took in peak-time. I replaced the
>> bolts names with numbers in order to be authentic
>>
>> I can see some failed bolts within the spout. Any idea?
>>
>> [image: תמונה מוטבעת 1]
>>
>> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>
>>> You don't need to ack when extending BaseBasicBolt
>>
>>
>> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
>> wrote:
>>
>>> try to replace: basicOutputCollector.emit(new
>>> Values(pushMessageResponseDTO));
>>> with
>>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>>             collector.ack(tuple);
>>>
>>>
>>>
>>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:
>>>
>>>> And where do you ACK/FAIL tuples ?
>>>>
>>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>>
>>>>> By the metrics I can see some errors yes.
>>>>>
>>>>> but if I use try and catch why would they timout in a loop? once they
>>>>> timeout i am catching them logging them and thats it
>>>>>
>>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>>
>>>>>> Do you have a large number of errored tuples in this topology? You
>>>>>> might run into a situation where tuples timeout in a loop
>>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>>>>
>>>>>>> My Topology including a bolt which opening Http Request to
>>>>>>> webservice.
>>>>>>> The average response is 500 milliseconds (how-ever sometimes it
>>>>>>> takes longer)
>>>>>>>
>>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>>
>>>>>>> When I send messages one by one everything working fine but
>>>>>>>
>>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>>> the messages
>>>>>>>
>>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>>
>>>>>>>
>>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>>
>>>>>>> package com.mycompany.push.topology;
>>>>>>>
>>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>>> import backtype.storm.task.TopologyContext;
>>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>>> import backtype.storm.tuple.Fields;
>>>>>>> import backtype.storm.tuple.Tuple;
>>>>>>> import backtype.storm.tuple.Values;
>>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>>> import org.apache.http.NameValuePair;
>>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>>> import org.slf4j.Logger;
>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>
>>>>>>> import java.io.IOException;
>>>>>>> import java.net.SocketTimeoutException;
>>>>>>> import java.util.ArrayList;
>>>>>>> import java.util.List;
>>>>>>> import java.util.Map;
>>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>>
>>>>>>>     private CloseableHttpClient httpClient;
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>>         initMetrics(context);
>>>>>>>         httpClient = getHttpClientInstance();
>>>>>>>     }
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>>         try {
>>>>>>>             received_message_counter.incr();
>>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>>             }
>>>>>>>         } catch (Exception e) {
>>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>>         }
>>>>>>>     }
>>>>>>>
>>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>>         CloseableHttpResponse response = null;
>>>>>>>         try {
>>>>>>>
>>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>>             response = httpClient.execute(post);
>>>>>>>             response.getEntity();
>>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>>                 received_not_status_200_counter.incr();
>>>>>>>             } else {
>>>>>>>                 received_status_200_counter.incr();
>>>>>>>             }
>>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>>             return pushMessageResponseDTO;
>>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>>             received_time_out_counter.incr();
>>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>>
>>>>>>>         } catch (Throwable t) {
>>>>>>>             received_fail_status_counter.incr();
>>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>>             if (t.getMessage() != null) {
>>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>>             }
>>>>>>>         } finally {
>>>>>>>             if (response != null) {
>>>>>>>                 response.close();
>>>>>>>             }
>>>>>>>         }
>>>>>>>         return pushMessageResponseDTO;
>>>>>>>     }
>>>>>>>
>>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>>         cm.setMaxTotal(500);
>>>>>>>         int timeout = 4;
>>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>>         return HttpClientBuilder.create().
>>>>>>>                 setDefaultRequestConfig(config).
>>>>>>>                 setConnectionManager(cm).
>>>>>>>                 build();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>> Thank you.
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
they're probably timed out.  you can try tweaking max spout pending (what
is its value in your topology?) or increasing the tuple timeout value.

On Tue, Mar 10, 2015 at 10:15 AM, Idan Fridman <id...@gmail.com> wrote:

> I attached storm-ui screenshot which I took in peak-time. I replaced the
> bolts names with numbers in order to be authentic
>
> I can see some failed bolts within the spout. Any idea?
>
> [image: תמונה מוטבעת 1]
>
> 2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>
>> You don't need to ack when extending BaseBasicBolt
>
>
> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
> wrote:
>
>> try to replace: basicOutputCollector.emit(new
>> Values(pushMessageResponseDTO));
>> with
>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>             collector.ack(tuple);
>>
>>
>>
>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:
>>
>>> And where do you ACK/FAIL tuples ?
>>>
>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>
>>>> By the metrics I can see some errors yes.
>>>>
>>>> but if I use try and catch why would they timout in a loop? once they
>>>> timeout i am catching them logging them and thats it
>>>>
>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> Do you have a large number of errored tuples in this topology? You
>>>>> might run into a situation where tuples timeout in a loop
>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>>>
>>>>>> My Topology including a bolt which opening Http Request to
>>>>>> webservice.
>>>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>>>> longer)
>>>>>>
>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>
>>>>>> When I send messages one by one everything working fine but
>>>>>>
>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>> the messages
>>>>>>
>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>
>>>>>>
>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>
>>>>>> package com.mycompany.push.topology;
>>>>>>
>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Tuple;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>> import org.apache.http.NameValuePair;
>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>> import org.slf4j.Logger;
>>>>>> import org.slf4j.LoggerFactory;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>> import java.net.SocketTimeoutException;
>>>>>> import java.util.ArrayList;
>>>>>> import java.util.List;
>>>>>> import java.util.Map;
>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>
>>>>>>
>>>>>>
>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>
>>>>>>     private CloseableHttpClient httpClient;
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>         initMetrics(context);
>>>>>>         httpClient = getHttpClientInstance();
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>         try {
>>>>>>             received_message_counter.incr();
>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>             }
>>>>>>         } catch (Exception e) {
>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>         CloseableHttpResponse response = null;
>>>>>>         try {
>>>>>>
>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>             response = httpClient.execute(post);
>>>>>>             response.getEntity();
>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>                 received_not_status_200_counter.incr();
>>>>>>             } else {
>>>>>>                 received_status_200_counter.incr();
>>>>>>             }
>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>             return pushMessageResponseDTO;
>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>             received_time_out_counter.incr();
>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>
>>>>>>         } catch (Throwable t) {
>>>>>>             received_fail_status_counter.incr();
>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>             if (t.getMessage() != null) {
>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>             }
>>>>>>         } finally {
>>>>>>             if (response != null) {
>>>>>>                 response.close();
>>>>>>             }
>>>>>>         }
>>>>>>         return pushMessageResponseDTO;
>>>>>>     }
>>>>>>
>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>         cm.setMaxTotal(500);
>>>>>>         int timeout = 4;
>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>         return HttpClientBuilder.create().
>>>>>>                 setDefaultRequestConfig(config).
>>>>>>                 setConnectionManager(cm).
>>>>>>                 build();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
I attached storm-ui screenshot which I took in peak-time. I replaced the
bolts names with numbers in order to be authentic

I can see some failed bolts within the spout. Any idea?

[image: תמונה מוטבעת 1]

2015-03-10 15:49 GMT+02:00 Nathan Leung <nc...@gmail.com>:

> You don't need to ack when extending BaseBasicBolt
>
> On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
> wrote:
>
>> try to replace: basicOutputCollector.emit(new
>> Values(pushMessageResponseDTO));
>> with
>>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>>             collector.ack(tuple);
>>
>>
>>
>> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:
>>
>>> And where do you ACK/FAIL tuples ?
>>>
>>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>>
>>>> By the metrics I can see some errors yes.
>>>>
>>>> but if I use try and catch why would they timout in a loop? once they
>>>> timeout i am catching them logging them and thats it
>>>>
>>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>>
>>>>> Do you have a large number of errored tuples in this topology? You
>>>>> might run into a situation where tuples timeout in a loop
>>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>>>
>>>>>> My Topology including a bolt which opening Http Request to
>>>>>> webservice.
>>>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>>>> longer)
>>>>>>
>>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>>
>>>>>> When I send messages one by one everything working fine but
>>>>>>
>>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>>> into there anymore.* and the worst thing I am having a "reply" of
>>>>>> the messages
>>>>>>
>>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>>
>>>>>>
>>>>>> 1. *Why Messages being replied? I dont need that*
>>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>>
>>>>>> package com.mycompany.push.topology;
>>>>>>
>>>>>> import backtype.storm.metric.api.CountMetric;
>>>>>> import backtype.storm.task.TopologyContext;
>>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>>> import backtype.storm.tuple.Fields;
>>>>>> import backtype.storm.tuple.Tuple;
>>>>>> import backtype.storm.tuple.Values;
>>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>>> import org.apache.http.NameValuePair;
>>>>>> import org.apache.http.client.config.RequestConfig;
>>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>>> import org.apache.http.client.methods.HttpPost;
>>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>>> import org.slf4j.Logger;
>>>>>> import org.slf4j.LoggerFactory;
>>>>>>
>>>>>> import java.io.IOException;
>>>>>> import java.net.SocketTimeoutException;
>>>>>> import java.util.ArrayList;
>>>>>> import java.util.List;
>>>>>> import java.util.Map;
>>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>>
>>>>>>
>>>>>>
>>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>>
>>>>>>     private CloseableHttpClient httpClient;
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>>         initMetrics(context);
>>>>>>         httpClient = getHttpClientInstance();
>>>>>>     }
>>>>>>
>>>>>>
>>>>>>     @Override
>>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>>         try {
>>>>>>             received_message_counter.incr();
>>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>>             if (pushMessageRequestDTO != null) {
>>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>>                 returned_from_externalService_counter.incr();
>>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>>             }
>>>>>>         } catch (Exception e) {
>>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>>         CloseableHttpResponse response = null;
>>>>>>         try {
>>>>>>
>>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>>             response = httpClient.execute(post);
>>>>>>             response.getEntity();
>>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>>                 received_not_status_200_counter.incr();
>>>>>>             } else {
>>>>>>                 received_status_200_counter.incr();
>>>>>>             }
>>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>>             return pushMessageResponseDTO;
>>>>>>         } catch (SocketTimeoutException e) {
>>>>>>             received_time_out_counter.incr();
>>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>>
>>>>>>         } catch (Throwable t) {
>>>>>>             received_fail_status_counter.incr();
>>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>>             if (t.getMessage() != null) {
>>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>>             }
>>>>>>         } finally {
>>>>>>             if (response != null) {
>>>>>>                 response.close();
>>>>>>             }
>>>>>>         }
>>>>>>         return pushMessageResponseDTO;
>>>>>>     }
>>>>>>
>>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>>         cm.setMaxTotal(500);
>>>>>>         int timeout = 4;
>>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>>         return HttpClientBuilder.create().
>>>>>>                 setDefaultRequestConfig(config).
>>>>>>                 setConnectionManager(cm).
>>>>>>                 build();
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>> Thank you.
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
You don't need to ack when extending BaseBasicBolt

On Tue, Mar 10, 2015 at 9:47 AM, Haralds Ulmanis <ha...@evilezh.net>
wrote:

> try to replace: basicOutputCollector.emit(new
> Values(pushMessageResponseDTO));
> with
>  basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
>             collector.ack(tuple);
>
>
>
> On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:
>
>> And where do you ACK/FAIL tuples ?
>>
>> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>>
>>> By the metrics I can see some errors yes.
>>>
>>> but if I use try and catch why would they timout in a loop? once they
>>> timeout i am catching them logging them and thats it
>>>
>>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>>
>>>> Do you have a large number of errored tuples in this topology? You
>>>> might run into a situation where tuples timeout in a loop
>>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>>
>>>>> My Topology including a bolt which opening Http Request to webservice.
>>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>>> longer)
>>>>>
>>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>>
>>>>> When I send messages one by one everything working fine but
>>>>>
>>>>> Under High throughput *that bolt is getting stuck and nothing get
>>>>> into there anymore.* and the worst thing I am having a "reply" of the
>>>>> messages
>>>>>
>>>>> The only way to get thru this is to reset kafka's offset. else the
>>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>>
>>>>>
>>>>> 1. *Why Messages being replied? I dont need that*
>>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>>
>>>>> package com.mycompany.push.topology;
>>>>>
>>>>> import backtype.storm.metric.api.CountMetric;
>>>>> import backtype.storm.task.TopologyContext;
>>>>> import backtype.storm.topology.BasicOutputCollector;
>>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>>> import backtype.storm.tuple.Fields;
>>>>> import backtype.storm.tuple.Tuple;
>>>>> import backtype.storm.tuple.Values;
>>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>>> import org.apache.http.NameValuePair;
>>>>> import org.apache.http.client.config.RequestConfig;
>>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>>> import org.apache.http.client.methods.HttpPost;
>>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>>> import org.apache.http.message.BasicNameValuePair;
>>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>>
>>>>> import java.io.IOException;
>>>>> import java.net.SocketTimeoutException;
>>>>> import java.util.ArrayList;
>>>>> import java.util.List;
>>>>> import java.util.Map;
>>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>>
>>>>>
>>>>>
>>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>>
>>>>>     private CloseableHttpClient httpClient;
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>>         initMetrics(context);
>>>>>         httpClient = getHttpClientInstance();
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>>         try {
>>>>>             received_message_counter.incr();
>>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>>             if (pushMessageRequestDTO != null) {
>>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>>                 returned_from_externalService_counter.incr();
>>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>>             }
>>>>>         } catch (Exception e) {
>>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>>         }
>>>>>     }
>>>>>
>>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>>         CloseableHttpResponse response = null;
>>>>>         try {
>>>>>
>>>>>             HttpPost post = new HttpPost("external.url");
>>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>>             response = httpClient.execute(post);
>>>>>             response.getEntity();
>>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>>                 received_not_status_200_counter.incr();
>>>>>             } else {
>>>>>                 received_status_200_counter.incr();
>>>>>             }
>>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>>             return pushMessageResponseDTO;
>>>>>         } catch (SocketTimeoutException e) {
>>>>>             received_time_out_counter.incr();
>>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>>
>>>>>         } catch (Throwable t) {
>>>>>             received_fail_status_counter.incr();
>>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>>             if (t.getMessage() != null) {
>>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>>             }
>>>>>         } finally {
>>>>>             if (response != null) {
>>>>>                 response.close();
>>>>>             }
>>>>>         }
>>>>>         return pushMessageResponseDTO;
>>>>>     }
>>>>>
>>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>>         cm.setDefaultMaxPerRoute(100);
>>>>>         cm.setMaxTotal(500);
>>>>>         int timeout = 4;
>>>>>         RequestConfig config = RequestConfig.custom()
>>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>>         return HttpClientBuilder.create().
>>>>>                 setDefaultRequestConfig(config).
>>>>>                 setConnectionManager(cm).
>>>>>                 build();
>>>>>     }
>>>>> }
>>>>>
>>>>> Thank you.
>>>>>
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Haralds Ulmanis <ha...@evilezh.net>.
try to replace: basicOutputCollector.emit(new
Values(pushMessageResponseDTO));
with
 basicOutputCollector.emit(tuple, new Values(pushMessageResponseDTO));
            collector.ack(tuple);



On 10 March 2015 at 13:43, Haralds Ulmanis <ha...@evilezh.net> wrote:

> And where do you ACK/FAIL tuples ?
>
> On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:
>
>> By the metrics I can see some errors yes.
>>
>> but if I use try and catch why would they timout in a loop? once they
>> timeout i am catching them logging them and thats it
>>
>> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>>
>>> Do you have a large number of errored tuples in this topology? You might
>>> run into a situation where tuples timeout in a loop
>>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>>
>>>> My Topology including a bolt which opening Http Request to webservice.
>>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>>> longer)
>>>>
>>>> * I added timeout functionality. and I am using KafkaSpout
>>>>
>>>> When I send messages one by one everything working fine but
>>>>
>>>> Under High throughput *that bolt is getting stuck and nothing get into
>>>> there anymore.* and the worst thing I am having a "reply" of the
>>>> messages
>>>>
>>>> The only way to get thru this is to reset kafka's offset. else the
>>>> zookeeper still logging kafka's offset and messages are still replying
>>>>
>>>>
>>>> 1. *Why Messages being replied? I dont need that*
>>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>>
>>>> package com.mycompany.push.topology;
>>>>
>>>> import backtype.storm.metric.api.CountMetric;
>>>> import backtype.storm.task.TopologyContext;
>>>> import backtype.storm.topology.BasicOutputCollector;
>>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>>> import backtype.storm.topology.base.BaseBasicBolt;
>>>> import backtype.storm.tuple.Fields;
>>>> import backtype.storm.tuple.Tuple;
>>>> import backtype.storm.tuple.Values;
>>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>>> import org.apache.http.NameValuePair;
>>>> import org.apache.http.client.config.RequestConfig;
>>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>>> import org.apache.http.client.methods.HttpPost;
>>>> import org.apache.http.impl.client.CloseableHttpClient;
>>>> import org.apache.http.impl.client.HttpClientBuilder;
>>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>>> import org.apache.http.message.BasicNameValuePair;
>>>> import org.slf4j.Logger;
>>>> import org.slf4j.LoggerFactory;
>>>>
>>>> import java.io.IOException;
>>>> import java.net.SocketTimeoutException;
>>>> import java.util.ArrayList;
>>>> import java.util.List;
>>>> import java.util.Map;
>>>> import java.util.concurrent.LinkedBlockingQueue;
>>>>
>>>>
>>>>
>>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>>
>>>>     private CloseableHttpClient httpClient;
>>>>
>>>>
>>>>     @Override
>>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>>         initMetrics(context);
>>>>         httpClient = getHttpClientInstance();
>>>>     }
>>>>
>>>>
>>>>     @Override
>>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>>         try {
>>>>             received_message_counter.incr();
>>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>>             if (pushMessageRequestDTO != null) {
>>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>>                 returned_from_externalService_counter.incr();
>>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>>             }
>>>>         } catch (Exception e) {
>>>>             log.error("externalServiceOutputBolt. Error", e);
>>>>         }
>>>>     }
>>>>
>>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>>         CloseableHttpResponse response = null;
>>>>         try {
>>>>
>>>>             HttpPost post = new HttpPost("external.url");
>>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>>             response = httpClient.execute(post);
>>>>             response.getEntity();
>>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>>                 received_not_status_200_counter.incr();
>>>>             } else {
>>>>                 received_status_200_counter.incr();
>>>>             }
>>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>>             return pushMessageResponseDTO;
>>>>         } catch (SocketTimeoutException e) {
>>>>             received_time_out_counter.incr();
>>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>>
>>>>         } catch (Throwable t) {
>>>>             received_fail_status_counter.incr();
>>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>>             if (t.getMessage() != null) {
>>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>>             }
>>>>         } finally {
>>>>             if (response != null) {
>>>>                 response.close();
>>>>             }
>>>>         }
>>>>         return pushMessageResponseDTO;
>>>>     }
>>>>
>>>>     private CloseableHttpClient getHttpClientInstance() {
>>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>>         cm.setDefaultMaxPerRoute(100);
>>>>         cm.setMaxTotal(500);
>>>>         int timeout = 4;
>>>>         RequestConfig config = RequestConfig.custom()
>>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>>                 .setSocketTimeout(timeout * 1000).build();
>>>>         return HttpClientBuilder.create().
>>>>                 setDefaultRequestConfig(config).
>>>>                 setConnectionManager(cm).
>>>>                 build();
>>>>     }
>>>> }
>>>>
>>>> Thank you.
>>>>
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Haralds Ulmanis <ha...@evilezh.net>.
And where do you ACK/FAIL tuples ?

On 10 March 2015 at 13:39, Idan Fridman <id...@gmail.com> wrote:

> By the metrics I can see some errors yes.
>
> but if I use try and catch why would they timout in a loop? once they
> timeout i am catching them logging them and thats it
>
> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>
>> Do you have a large number of errored tuples in this topology? You might
>> run into a situation where tuples timeout in a loop
>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>
>>> My Topology including a bolt which opening Http Request to webservice.
>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>> longer)
>>>
>>> * I added timeout functionality. and I am using KafkaSpout
>>>
>>> When I send messages one by one everything working fine but
>>>
>>> Under High throughput *that bolt is getting stuck and nothing get into
>>> there anymore.* and the worst thing I am having a "reply" of the
>>> messages
>>>
>>> The only way to get thru this is to reset kafka's offset. else the
>>> zookeeper still logging kafka's offset and messages are still replying
>>>
>>>
>>> 1. *Why Messages being replied? I dont need that*
>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>
>>> package com.mycompany.push.topology;
>>>
>>> import backtype.storm.metric.api.CountMetric;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>> import org.apache.http.NameValuePair;
>>> import org.apache.http.client.config.RequestConfig;
>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>> import org.apache.http.client.methods.HttpPost;
>>> import org.apache.http.impl.client.CloseableHttpClient;
>>> import org.apache.http.impl.client.HttpClientBuilder;
>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>> import org.apache.http.message.BasicNameValuePair;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.IOException;
>>> import java.net.SocketTimeoutException;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> import java.util.Map;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>
>>>
>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>
>>>     private CloseableHttpClient httpClient;
>>>
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>     }
>>>
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>         initMetrics(context);
>>>         httpClient = getHttpClientInstance();
>>>     }
>>>
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>         try {
>>>             received_message_counter.incr();
>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>             if (pushMessageRequestDTO != null) {
>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>                 returned_from_externalService_counter.incr();
>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>             }
>>>         } catch (Exception e) {
>>>             log.error("externalServiceOutputBolt. Error", e);
>>>         }
>>>     }
>>>
>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>         CloseableHttpResponse response = null;
>>>         try {
>>>
>>>             HttpPost post = new HttpPost("external.url");
>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>             response = httpClient.execute(post);
>>>             response.getEntity();
>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>                 received_not_status_200_counter.incr();
>>>             } else {
>>>                 received_status_200_counter.incr();
>>>             }
>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>             return pushMessageResponseDTO;
>>>         } catch (SocketTimeoutException e) {
>>>             received_time_out_counter.incr();
>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>
>>>         } catch (Throwable t) {
>>>             received_fail_status_counter.incr();
>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>             if (t.getMessage() != null) {
>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>             }
>>>         } finally {
>>>             if (response != null) {
>>>                 response.close();
>>>             }
>>>         }
>>>         return pushMessageResponseDTO;
>>>     }
>>>
>>>     private CloseableHttpClient getHttpClientInstance() {
>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>         cm.setDefaultMaxPerRoute(100);
>>>         cm.setMaxTotal(500);
>>>         int timeout = 4;
>>>         RequestConfig config = RequestConfig.custom()
>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>                 .setSocketTimeout(timeout * 1000).build();
>>>         return HttpClientBuilder.create().
>>>                 setDefaultRequestConfig(config).
>>>                 setConnectionManager(cm).
>>>                 build();
>>>     }
>>> }
>>>
>>> Thank you.
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
If your tuple times out then it will be failed at the spout.  Think of it
this way, after the spout emits, your tuple has to make it through the
output queue, and finish processing before the timeout period occurs.
Otherwise with KafkaSpout it will be re-emitted.


1) Spout emits to output queue (max size: max spout pending)

2) tuple drains through output queue (size n, worst case is max spout
pending)

-- output queue 0 --
-- output queue 1 --
-- ... --
-- output queue n --

3) other processing?

4) http bolt

For the purpose of illustration, let's assume parallelism is 1 across the
board, max spout pending is 100, timeout is 30 seconds, and your http bolt
is taking the worst case scenario of 1 second (from your code).  If you are
getting 1 input tuple every 2 seconds, then you are fine.  Your spout
output queue is empty, your processing takes ~1 second, and you're done.
Now let's assume you get a flood of 100 tuples all of a sudden.  Your spout
pulls these off of Kafka, and fills the output queue.  Now when it emits
the next tuple, that tuple has to wait for the 100 tuples in the output
queue ahead of it to drain before it can be processed.  In this example,
this will take 100 seconds.  Not only will this tuple timeout, but tuples
in the queue ahead of it will as well.  These tuples will all need to be
replayed, and will continue to timeout because there is too much in the
queue ahead of them.  You will see that the bolt is very busy but your
topology is unable to make progress.

At least, it sounds like this might be what is happening.

On Tue, Mar 10, 2015 at 9:39 AM, Idan Fridman <id...@gmail.com> wrote:

> By the metrics I can see some errors yes.
>
> but if I use try and catch why would they timout in a loop? once they
> timeout i am catching them logging them and thats it
>
> 2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:
>
>> Do you have a large number of errored tuples in this topology? You might
>> run into a situation where tuples timeout in a loop
>> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>>
>>> My Topology including a bolt which opening Http Request to webservice.
>>> The average response is 500 milliseconds (how-ever sometimes it takes
>>> longer)
>>>
>>> * I added timeout functionality. and I am using KafkaSpout
>>>
>>> When I send messages one by one everything working fine but
>>>
>>> Under High throughput *that bolt is getting stuck and nothing get into
>>> there anymore.* and the worst thing I am having a "reply" of the
>>> messages
>>>
>>> The only way to get thru this is to reset kafka's offset. else the
>>> zookeeper still logging kafka's offset and messages are still replying
>>>
>>>
>>> 1. *Why Messages being replied? I dont need that*
>>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>>
>>> package com.mycompany.push.topology;
>>>
>>> import backtype.storm.metric.api.CountMetric;
>>> import backtype.storm.task.TopologyContext;
>>> import backtype.storm.topology.BasicOutputCollector;
>>> import backtype.storm.topology.OutputFieldsDeclarer;
>>> import backtype.storm.topology.base.BaseBasicBolt;
>>> import backtype.storm.tuple.Fields;
>>> import backtype.storm.tuple.Tuple;
>>> import backtype.storm.tuple.Values;
>>> import com.mycompany.push.dto.PushMessageRequestDTO;
>>> import com.mycompany.push.dto.PushMessageResponseDTO;
>>> import org.apache.http.NameValuePair;
>>> import org.apache.http.client.config.RequestConfig;
>>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>>> import org.apache.http.client.methods.CloseableHttpResponse;
>>> import org.apache.http.client.methods.HttpPost;
>>> import org.apache.http.impl.client.CloseableHttpClient;
>>> import org.apache.http.impl.client.HttpClientBuilder;
>>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>>> import org.apache.http.message.BasicNameValuePair;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>>
>>> import java.io.IOException;
>>> import java.net.SocketTimeoutException;
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> import java.util.Map;
>>> import java.util.concurrent.LinkedBlockingQueue;
>>>
>>>
>>>
>>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>>
>>>     private CloseableHttpClient httpClient;
>>>
>>>
>>>     @Override
>>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>>         declarer.declare(new Fields("pushMessageResponse"));
>>>     }
>>>
>>>
>>>     @Override
>>>     public void prepare(Map stormConf, TopologyContext context) {
>>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>>         initMetrics(context);
>>>         httpClient = getHttpClientInstance();
>>>     }
>>>
>>>
>>>     @Override
>>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>>         try {
>>>             received_message_counter.incr();
>>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>>             if (pushMessageRequestDTO != null) {
>>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>>                 returned_from_externalService_counter.incr();
>>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>>             }
>>>         } catch (Exception e) {
>>>             log.error("externalServiceOutputBolt. Error", e);
>>>         }
>>>     }
>>>
>>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>>         CloseableHttpResponse response = null;
>>>         try {
>>>
>>>             HttpPost post = new HttpPost("external.url");
>>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>>             response = httpClient.execute(post);
>>>             response.getEntity();
>>>             if (response.getStatusLine().getStatusCode() != 200) {
>>>                 received_not_status_200_counter.incr();
>>>             } else {
>>>                 received_status_200_counter.incr();
>>>             }
>>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>>             return pushMessageResponseDTO;
>>>         } catch (SocketTimeoutException e) {
>>>             received_time_out_counter.incr();
>>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>>
>>>         } catch (Throwable t) {
>>>             received_fail_status_counter.incr();
>>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>>             if (t.getMessage() != null) {
>>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>>             }
>>>         } finally {
>>>             if (response != null) {
>>>                 response.close();
>>>             }
>>>         }
>>>         return pushMessageResponseDTO;
>>>     }
>>>
>>>     private CloseableHttpClient getHttpClientInstance() {
>>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>>         cm.setDefaultMaxPerRoute(100);
>>>         cm.setMaxTotal(500);
>>>         int timeout = 4;
>>>         RequestConfig config = RequestConfig.custom()
>>>                 .setConnectTimeout(timeout * 1000) //in millis
>>>                 .setConnectionRequestTimeout(timeout * 1000)
>>>                 .setSocketTimeout(timeout * 1000).build();
>>>         return HttpClientBuilder.create().
>>>                 setDefaultRequestConfig(config).
>>>                 setConnectionManager(cm).
>>>                 build();
>>>     }
>>> }
>>>
>>> Thank you.
>>>
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Idan Fridman <id...@gmail.com>.
By the metrics I can see some errors yes.

but if I use try and catch why would they timout in a loop? once they
timeout i am catching them logging them and thats it

2015-03-10 15:35 GMT+02:00 Nathan Leung <nc...@gmail.com>:

> Do you have a large number of errored tuples in this topology? You might
> run into a situation where tuples timeout in a loop
> On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:
>
>> My Topology including a bolt which opening Http Request to webservice.
>> The average response is 500 milliseconds (how-ever sometimes it takes
>> longer)
>>
>> * I added timeout functionality. and I am using KafkaSpout
>>
>> When I send messages one by one everything working fine but
>>
>> Under High throughput *that bolt is getting stuck and nothing get into
>> there anymore.* and the worst thing I am having a "reply" of the
>> messages
>>
>> The only way to get thru this is to reset kafka's offset. else the
>> zookeeper still logging kafka's offset and messages are still replying
>>
>>
>> 1. *Why Messages being replied? I dont need that*
>> 2. Here is  my code example of the"ExternalServiceOutputBolt
>>
>> package com.mycompany.push.topology;
>>
>> import backtype.storm.metric.api.CountMetric;
>> import backtype.storm.task.TopologyContext;
>> import backtype.storm.topology.BasicOutputCollector;
>> import backtype.storm.topology.OutputFieldsDeclarer;
>> import backtype.storm.topology.base.BaseBasicBolt;
>> import backtype.storm.tuple.Fields;
>> import backtype.storm.tuple.Tuple;
>> import backtype.storm.tuple.Values;
>> import com.mycompany.push.dto.PushMessageRequestDTO;
>> import com.mycompany.push.dto.PushMessageResponseDTO;
>> import org.apache.http.NameValuePair;
>> import org.apache.http.client.config.RequestConfig;
>> import org.apache.http.client.entity.UrlEncodedFormEntity;
>> import org.apache.http.client.methods.CloseableHttpResponse;
>> import org.apache.http.client.methods.HttpPost;
>> import org.apache.http.impl.client.CloseableHttpClient;
>> import org.apache.http.impl.client.HttpClientBuilder;
>> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
>> import org.apache.http.message.BasicNameValuePair;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.IOException;
>> import java.net.SocketTimeoutException;
>> import java.util.ArrayList;
>> import java.util.List;
>> import java.util.Map;
>> import java.util.concurrent.LinkedBlockingQueue;
>>
>>
>>
>> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>>
>>     private CloseableHttpClient httpClient;
>>
>>
>>     @Override
>>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>         declarer.declare(new Fields("pushMessageResponse"));
>>     }
>>
>>
>>     @Override
>>     public void prepare(Map stormConf, TopologyContext context) {
>>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>>         initMetrics(context);
>>         httpClient = getHttpClientInstance();
>>     }
>>
>>
>>     @Override
>>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>>         try {
>>             received_message_counter.incr();
>>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>>             if (pushMessageRequestDTO != null) {
>>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>>                 returned_from_externalService_counter.incr();
>>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>>             }
>>         } catch (Exception e) {
>>             log.error("externalServiceOutputBolt. Error", e);
>>         }
>>     }
>>
>>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>>         CloseableHttpResponse response = null;
>>         try {
>>
>>             HttpPost post = new HttpPost("external.url");
>>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>>             response = httpClient.execute(post);
>>             response.getEntity();
>>             if (response.getStatusLine().getStatusCode() != 200) {
>>                 received_not_status_200_counter.incr();
>>             } else {
>>                 received_status_200_counter.incr();
>>             }
>>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>>             return pushMessageResponseDTO;
>>         } catch (SocketTimeoutException e) {
>>             received_time_out_counter.incr();
>>             log.error("externalServiceOutputBolt, TimeoutException", e);
>>
>>         } catch (Throwable t) {
>>             received_fail_status_counter.incr();
>>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>>             if (t.getMessage() != null) {
>>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>>             }
>>         } finally {
>>             if (response != null) {
>>                 response.close();
>>             }
>>         }
>>         return pushMessageResponseDTO;
>>     }
>>
>>     private CloseableHttpClient getHttpClientInstance() {
>>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>>         cm.setDefaultMaxPerRoute(100);
>>         cm.setMaxTotal(500);
>>         int timeout = 4;
>>         RequestConfig config = RequestConfig.custom()
>>                 .setConnectTimeout(timeout * 1000) //in millis
>>                 .setConnectionRequestTimeout(timeout * 1000)
>>                 .setSocketTimeout(timeout * 1000).build();
>>         return HttpClientBuilder.create().
>>                 setDefaultRequestConfig(config).
>>                 setConnectionManager(cm).
>>                 build();
>>     }
>> }
>>
>> Thank you.
>>
>

Re: Topology is failing using HttpClient on high throughput

Posted by Nathan Leung <nc...@gmail.com>.
Do you have a large number of errored tuples in this topology? You might
run into a situation where tuples timeout in a loop
On Mar 10, 2015 8:58 AM, "Idan Fridman" <id...@gmail.com> wrote:

> My Topology including a bolt which opening Http Request to webservice.
> The average response is 500 milliseconds (how-ever sometimes it takes
> longer)
>
> * I added timeout functionality. and I am using KafkaSpout
>
> When I send messages one by one everything working fine but
>
> Under High throughput *that bolt is getting stuck and nothing get into
> there anymore.* and the worst thing I am having a "reply" of the messages
>
> The only way to get thru this is to reset kafka's offset. else the
> zookeeper still logging kafka's offset and messages are still replying
>
>
> 1. *Why Messages being replied? I dont need that*
> 2. Here is  my code example of the"ExternalServiceOutputBolt
>
> package com.mycompany.push.topology;
>
> import backtype.storm.metric.api.CountMetric;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.BasicOutputCollector;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseBasicBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import com.mycompany.push.dto.PushMessageRequestDTO;
> import com.mycompany.push.dto.PushMessageResponseDTO;
> import org.apache.http.NameValuePair;
> import org.apache.http.client.config.RequestConfig;
> import org.apache.http.client.entity.UrlEncodedFormEntity;
> import org.apache.http.client.methods.CloseableHttpResponse;
> import org.apache.http.client.methods.HttpPost;
> import org.apache.http.impl.client.CloseableHttpClient;
> import org.apache.http.impl.client.HttpClientBuilder;
> import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
> import org.apache.http.message.BasicNameValuePair;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.IOException;
> import java.net.SocketTimeoutException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.Map;
> import java.util.concurrent.LinkedBlockingQueue;
>
>
>
> public class ExternalServiceOutputBolt extends BaseBasicBolt {
>
>     private CloseableHttpClient httpClient;
>
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         declarer.declare(new Fields("pushMessageResponse"));
>     }
>
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context) {
>         externalServiceGraphUrl = (String) stormConf.get("externalServiceGraphUrl");
>         initMetrics(context);
>         httpClient = getHttpClientInstance();
>     }
>
>
>     @Override
>     public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
>         try {
>             received_message_counter.incr();
>             final PushMessageRequestDTO pushMessageRequestDTO = (PushMessageRequestDTO) tuple.getValueByField("pushMessage");
>             if (pushMessageRequestDTO != null) {
>                 PushMessageResponseDTO pushMessageResponseDTO = executePushNotificationRequest(pushMessageRequestDTO);
>                 returned_from_externalService_counter.incr();
>                 System.out.println("externalServiceOutputBolt,emit tupple with snid= " + pushMessageRequestDTO.getSnid() + " refId=" + pushMessageRequestDTO.getRefId());
>                 basicOutputCollector.emit(new Values(pushMessageResponseDTO));
>             }
>         } catch (Exception e) {
>             log.error("externalServiceOutputBolt. Error", e);
>         }
>     }
>
>     private PushMessageResponseDTO executePushNotificationRequest(PushMessageRequestDTO pushMessageRequestDTO) throws IOException {
>         PushMessageResponseDTO pushMessageResponseDTO = new PushMessageResponseDTO(pushMessageRequestDTO);
>         CloseableHttpResponse response = null;
>         try {
>
>             HttpPost post = new HttpPost("external.url");
>             post.setEntity(new UrlEncodedFormEntity(urlParameters));
>             response = httpClient.execute(post);
>             response.getEntity();
>             if (response.getStatusLine().getStatusCode() != 200) {
>                 received_not_status_200_counter.incr();
>             } else {
>                 received_status_200_counter.incr();
>             }
>             log.debug("externalServiceOutputBolt.onCompleted,  pushMessageRequestDTO=" + pushMessageResponseDTO.toString() + ", responseBody=" + response.getStatusLine().getReasonPhrase());
>             return pushMessageResponseDTO;
>         } catch (SocketTimeoutException e) {
>             received_time_out_counter.incr();
>             log.error("externalServiceOutputBolt, TimeoutException", e);
>
>         } catch (Throwable t) {
>             received_fail_status_counter.incr();
>             pushMessageResponseDTO.setFbResponse(PushMessageResponseDTO.fbResponseStatus.FAIL);
>             if (t.getMessage() != null) {
>                 log.error("externalServiceOutputBolt, error executing externalService API. errorMsg=" + t.getMessage(), t);
>             }
>         } finally {
>             if (response != null) {
>                 response.close();
>             }
>         }
>         return pushMessageResponseDTO;
>     }
>
>     private CloseableHttpClient getHttpClientInstance() {
>         PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
>         cm.setDefaultMaxPerRoute(100);
>         cm.setMaxTotal(500);
>         int timeout = 4;
>         RequestConfig config = RequestConfig.custom()
>                 .setConnectTimeout(timeout * 1000) //in millis
>                 .setConnectionRequestTimeout(timeout * 1000)
>                 .setSocketTimeout(timeout * 1000).build();
>         return HttpClientBuilder.create().
>                 setDefaultRequestConfig(config).
>                 setConnectionManager(cm).
>                 build();
>     }
> }
>
> Thank you.
>