You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Olivier Mallassi <ol...@gmail.com> on 2015/07/02 12:04:30 UTC

ack looks long

Hi all

I investigated in more details (1 event per sec, emitTimeInterval 1sec) and
it appears most of my 9ms are spent during exchange with _acker (stream
__ack_ack).

Regarding the following traces and if I understood correctly, I spend 7, 8
ms between the "Emitting direct __acker _ack_ack" and "Processing"

2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
b-0 __ack_ack [671062448715243437 2548740402887507265]
2015-07-02T08:57:49.311+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: b-0:4, stream: __ack_ack, id: {},
[671062448715243437 2548740402887507265]
2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting
direct: 1; __acker __ack_ack [671062448715243437]
******************************************
2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: __acker:3, stream: __ack_ack, id: {},
[671062448715243437]
2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO] Acking
message 175873:0

2015-07-02T08:57:49.318+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
$mastercoord-bg0 $commit [175873:0]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
$mastercoord-bg0 __ack_init [-4460222877314276996 -348189169909316069 1]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: $mastercoord-bg0:1, stream: $commit,
id: {-4460222877314276996=-348189169909316069}, [175873:0]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: $mastercoord-bg0:1, stream: __ack_init,
id: {}, [-4460222877314276996 -348189169909316069 1]
[------------------------------------------ the Aggregator is called ]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
b-0 __ack_ack [-4460222877314276996 -348189169909316069]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: b-0:4, stream: __ack_ack, id: {},
[-4460222877314276996 -348189169909316069]
2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting
direct: 1; __acker __ack_ack [-4460222877314276996]
******************************************
2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
Processing received message source: __acker:3, stream: __ack_ack, id: {},
[-4460222877314276996]
2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO] Acking
message 175873:0


This time is systematic for all tuples (and not related to gc pauses...).
Once again, when running the same topology (code + config) in local mode, I
do not observe the same kind of wait.

has anyone an idea of what's going on? is there a particular code section
that I can look at ?

thx.

olivier.



On Tue, Jun 30, 2015 at 5:37 PM, Olivier Mallassi <
olivier.mallassi@gmail.com> wrote:

> Hi all
>
> Here is the code of my topology
>
> topology.newStream(this.getTopologyName(), spout)
>                 .each(new Fields("event"), new
> EventTypeFilter(MyEvent.class))
>                 .each(new Fields("event"), flattern, new Fields( "a",
>  "b", "c"))
>                 .parallelismHint(1)
>                 .groupBy(new Fields("a", "b", "c"))
>                 .persistentAggregate(new MemoryMapState.StateFactory<>(),
>                         new Fields("event"),
>                         new EventAggregator(),
>
>                         new Fields("aggr"))
>                 .parallelismHint(1);
>
> I ran the topology with no parallelism, a troughtput of 20 events per sec
> to avoid any types of contention...
>
>
> By adding some traces, it looks like I spend ~8 ms between the end of the
> "flattern bolt" and the getAll() method implemented in the MemoryMapState.
>
> Looks weird but is it the expected behavior? Do you have any ideas of what
> can cause this delay (which is regular as a metronome) ?
>
> I am running storm 0.9.4.
>
> Regards.
>
> olivier.
>
>
>
>
> On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
> olivier.mallassi@gmail.com> wrote:
>
>> Hello all
>>
>> I am facing an issue or at least something I cannot figure out regarding
>> the end-to-end latency of my topology.
>> So I guess you will be able to help me.
>>
>> *Topology *
>> I am running a trident topology
>> - a IBatchSpout emits events of different types (1 spout only)
>> - a filter is applied and check the "instance of" the events. In most of
>> the case, this filter returns true. (b-1)
>> - A bolt (b-0) flattern the event
>> - a group by 3 fields of the event is done
>> - the events are aggregated using the persistentAggregate methods, a
>> custom reducer and a simple MemoryMapState object
>>
>> Due to my current data set, The events are grouped into 19 different
>> aggregates.
>>
>> In these cases, I run
>> - a single worker with a single spout and multiple executors (10)
>> - emit interval millis = 1
>> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large value
>> 16384.
>>
>> I measure my end-to-end latency by adding a timestamp in the event, just
>> before the _collector.emit() and then, in the reducer, by measuring the
>> delta. So this latency is the time an event needs to be aggregated.
>>
>> Here are the numbers I got
>>
>>   case#1 e*vents throughput*  *40000*                   capacity Execute
>> latency (ms) process latency (ms) complete latency(ms)  mastercoord-bg0
>>     10.031  $spoutcoord-spout0 0.022 0.243 0.181    __acker 0.004 0.004
>> 0.002    b-0 0.045 0.006 0.002    b-1 0.159 0.004 0.003    spout0 0.150
>> 1.673 1.919    *measured end2end latency (99%centile) *       *9.5*
>>        *case #2 events throughput* *250*        mastercoord-bg0
>> 10.026  $spoutcoord-spout0 0.024 0.255 0.190    __acker 0.006 0.005 0.002
>>    b-0 0.006 0.020 0.024    b-1 0.004 0.018 0.016    spout0 0.017 0.185
>> 0.135    *measured end2end latency (99%centile)*       *9.5*
>>
>>
>> The good news is that the end-to-end latency is quite stable where the
>> throughput of generated events has significantly increase.
>> The 'not so good' new is that I would like my latency to be under 1ms
>> (again, a single worker and at least for low throughput). Do you have any
>> ideas about how to configure the topology? maybe avoid using Trident and
>> use Storm API directly can help?
>>
>> Do you know how I can better understand where I spend my time in this
>> topology?
>>
>> Note that I have tried to increase the max.spout.pending but the
>> end-2-end latency is worst (like if everything was, in the end, mono
>> threaded)
>>
>> For sure, there is something I do not understand. Many thx for you help.
>>
>> Olivier
>>
>> PS: when I run the second case in local mode (on my dev laptop), I
>> observe a end-2-end latency around 1 to 2 ms.
>>
>>
>>
>

Re: ack looks long

Posted by Olivier Mallassi <ol...@gmail.com>.
Hi all,

To continue and go a little bit further, it looks this is specific to
Solaris.

I have run the same topology, in Local Mode (LocalCluster)
- on ubuntu 14.X and solaris
- 1 event every 10 ms (emit_batch_interval)
- following cmd line on both machines  java -server -Xmx2g -Xms2g
-XX:MaxNewSize=1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC  .... topology
main class, custom param
- java 1.8


The behavior on linux is "as expected"
2015-07-03T13:43:*34.243*+0200 [Thread-11-__acker] INFO
 backtype.storm.daemon.task - Emitting direct: 1; __acker __ack_ack
[7224296586433968782]
2015-07-03T13:43:*34.243*+0200 [Thread-21-$mastercoord-bg0] INFO
 backtype.storm.daemon.executor - Processing received message source:
__acker:3, stream: __ack_ack, id: {}, [7224296586433968782]

not on solaris
2015-07-03T14:28:*15.736*+0200 [Thread-11-__acker] INFO
 backtype.storm.daemon.task - Emitting direct: 1; __acker __ack_ack
[2253796205321724272]
2015-07-03T14:28:*15.744*+0200 [Thread-21-$mastercoord-bg0] INFO
 backtype.storm.daemon.executor - Processing received message source:
__acker:3, stream
: __ack_ack, id: {}, [2253796205321724272]


Looking at the code / trace, it looks like this could be related to
DisruptorQueue.consumeBatch which internally use SequenceBarrier.

Am I completly wrong? Can someone help me confirm / infirm this (and the
fact this is maybe related to Disruptor 2.10 behavior)?

Regards

On Thu, Jul 2, 2015 at 5:41 PM, Olivier Mallassi <olivier.mallassi@gmail.com
> wrote:

> and it looks like each time the emitDirect/consumeBatch method is called
> (to send an ack to mastercoord), I could expect a few ms wait
>
> 2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task Thread-9-b-1
> [INFO] Emitting: b-1 __ack_ack [4382299576881565710 1329423615189887198]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:152)
> Caller+4         at
> backtype.storm.daemon.task$send_unanchored.invoke(task.clj:111)
> Caller+5         at
> backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)
> Caller+6         at
> backtype.storm.daemon.executor$fn__4722$fn$reify__4767.ack(executor.clj:707)
> Caller+7         at backtype.storm.task.OutputCollector.*ack*
> (OutputCollector.java:213)
> 2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.executor
> Thread-5-__acker [INFO] Processing received message source: b-1:5, stream:
> __ack_ack, id: {}, [4382299576881565710 1329423615189887198]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
> Caller+4         at
> backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
> Caller+5         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> Caller+6         at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> Caller+7         at backtype.storm.disruptor$
> *consume_batch_when_available*.invoke(disruptor.clj:80)
> 2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task
> *Thread-5-__acker* [INFO] Emitting direct: 1; __acker __ack_ack
> [4382299576881565710]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:134)
> Caller+4         at
> backtype.storm.daemon.executor$fn__4722$fn__4734$bolt_emit__4761.invoke(executor.clj:662)
> Caller+5         at
> backtype.storm.daemon.executor$fn__4722$fn$reify__4767.emitDirect(executor.clj:700)
> Caller+6         at
> backtype.storm.task.OutputCollector.emitDirect(OutputCollector.java:208)
> Caller+7         at backtype.storm.task.OutputCollector.*emitDirect*
> (OutputCollector.java:135)
> 2015-07-02T16:10:00.*133+0200* backtype.storm.daemon.executor
> *Thread-15-$mastercoord-bg0* [INFO] Processing received message source:
> __acker:3, stream: __ack_ack, id: {}, [4382299576881565710]
>  Caller+0        at
> clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
> Caller+1         at
> clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
> Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
> Caller+3         at
> backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
> Caller+4         at
> backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
> Caller+5         at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> Caller+6         at
> backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
> Caller+7         at backtype.storm.disruptor$*consume_batch*
> .invoke(disruptor.clj:76)
>
>
> Is it something someone has observed on Linux platform? I am on Solaris.
>
> On Thu, Jul 2, 2015 at 12:04 PM, Olivier Mallassi <
> olivier.mallassi@gmail.com> wrote:
>
>> Hi all
>>
>> I investigated in more details (1 event per sec, emitTimeInterval 1sec)
>> and it appears most of my 9ms are spent during exchange with _acker (stream
>> __ack_ack).
>>
>> Regarding the following traces and if I understood correctly, I spend 7,
>> 8 ms between the "Emitting direct __acker _ack_ack" and "Processing"
>>
>> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: b-0 __ack_ack [671062448715243437 2548740402887507265]
>> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: b-0:4, stream: __ack_ack, id: {},
>> [671062448715243437 2548740402887507265]
>> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting
>> direct: 1; __acker __ack_ack [671062448715243437]
>> ******************************************
>> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: __acker:3, stream: __ack_ack, id: {},
>> [671062448715243437]
>> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
>> Acking message 175873:0
>>
>> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: $mastercoord-bg0 $commit [175873:0]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: $mastercoord-bg0 __ack_init [-4460222877314276996
>> -348189169909316069 1]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: $mastercoord-bg0:1, stream: $commit,
>> id: {-4460222877314276996=-348189169909316069}, [175873:0]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: $mastercoord-bg0:1, stream: __ack_init,
>> id: {}, [-4460222877314276996 -348189169909316069 1]
>> [------------------------------------------ the Aggregator is called ]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO]
>> Emitting: b-0 __ack_ack [-4460222877314276996 -348189169909316069]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: b-0:4, stream: __ack_ack, id: {},
>> [-4460222877314276996 -348189169909316069]
>> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting
>> direct: 1; __acker __ack_ack [-4460222877314276996]
>> ******************************************
>> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
>> Processing received message source: __acker:3, stream: __ack_ack, id: {},
>> [-4460222877314276996]
>> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
>> Acking message 175873:0
>>
>>
>> This time is systematic for all tuples (and not related to gc pauses...).
>> Once again, when running the same topology (code + config) in local mode, I
>> do not observe the same kind of wait.
>>
>> has anyone an idea of what's going on? is there a particular code section
>> that I can look at ?
>>
>> thx.
>>
>> olivier.
>>
>>
>>
>> On Tue, Jun 30, 2015 at 5:37 PM, Olivier Mallassi <
>> olivier.mallassi@gmail.com> wrote:
>>
>>> Hi all
>>>
>>> Here is the code of my topology
>>>
>>> topology.newStream(this.getTopologyName(), spout)
>>>                 .each(new Fields("event"), new
>>> EventTypeFilter(MyEvent.class))
>>>                 .each(new Fields("event"), flattern, new Fields( "a",
>>>  "b", "c"))
>>>                 .parallelismHint(1)
>>>                 .groupBy(new Fields("a", "b", "c"))
>>>                 .persistentAggregate(new MemoryMapState.StateFactory<>(),
>>>                         new Fields("event"),
>>>                         new EventAggregator(),
>>>
>>>                         new Fields("aggr"))
>>>                 .parallelismHint(1);
>>>
>>> I ran the topology with no parallelism, a troughtput of 20 events per
>>> sec to avoid any types of contention...
>>>
>>>
>>> By adding some traces, it looks like I spend ~8 ms between the end of
>>> the "flattern bolt" and the getAll() method implemented in the
>>> MemoryMapState.
>>>
>>> Looks weird but is it the expected behavior? Do you have any ideas of
>>> what can cause this delay (which is regular as a metronome) ?
>>>
>>> I am running storm 0.9.4.
>>>
>>> Regards.
>>>
>>> olivier.
>>>
>>>
>>>
>>>
>>> On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
>>> olivier.mallassi@gmail.com> wrote:
>>>
>>>> Hello all
>>>>
>>>> I am facing an issue or at least something I cannot figure out
>>>> regarding the end-to-end latency of my topology.
>>>> So I guess you will be able to help me.
>>>>
>>>> *Topology *
>>>> I am running a trident topology
>>>> - a IBatchSpout emits events of different types (1 spout only)
>>>> - a filter is applied and check the "instance of" the events. In most
>>>> of the case, this filter returns true. (b-1)
>>>> - A bolt (b-0) flattern the event
>>>> - a group by 3 fields of the event is done
>>>> - the events are aggregated using the persistentAggregate methods, a
>>>> custom reducer and a simple MemoryMapState object
>>>>
>>>> Due to my current data set, The events are grouped into 19 different
>>>> aggregates.
>>>>
>>>> In these cases, I run
>>>> - a single worker with a single spout and multiple executors (10)
>>>> - emit interval millis = 1
>>>> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large
>>>> value 16384.
>>>>
>>>> I measure my end-to-end latency by adding a timestamp in the event,
>>>> just before the _collector.emit() and then, in the reducer, by measuring
>>>> the delta. So this latency is the time an event needs to be aggregated.
>>>>
>>>> Here are the numbers I got
>>>>
>>>>   case#1 e*vents throughput*  *40000*                   capacity Execute
>>>> latency (ms) process latency (ms) complete latency(ms)  mastercoord-bg0
>>>>       10.031  $spoutcoord-spout0 0.022 0.243 0.181    __acker 0.004
>>>> 0.004 0.002    b-0 0.045 0.006 0.002    b-1 0.159 0.004 0.003    spout0
>>>> 0.150 1.673 1.919    *measured end2end latency (99%centile) *
>>>> *9.5*             *case #2 events throughput* *250*
>>>> mastercoord-bg0       10.026  $spoutcoord-spout0 0.024 0.255 0.190
>>>> __acker 0.006 0.005 0.002    b-0 0.006 0.020 0.024    b-1 0.004 0.018
>>>> 0.016    spout0 0.017 0.185 0.135    *measured end2end latency
>>>> (99%centile)*       *9.5*
>>>>
>>>>
>>>> The good news is that the end-to-end latency is quite stable where the
>>>> throughput of generated events has significantly increase.
>>>> The 'not so good' new is that I would like my latency to be under 1ms
>>>> (again, a single worker and at least for low throughput). Do you have any
>>>> ideas about how to configure the topology? maybe avoid using Trident and
>>>> use Storm API directly can help?
>>>>
>>>> Do you know how I can better understand where I spend my time in this
>>>> topology?
>>>>
>>>> Note that I have tried to increase the max.spout.pending but the
>>>> end-2-end latency is worst (like if everything was, in the end, mono
>>>> threaded)
>>>>
>>>> For sure, there is something I do not understand. Many thx for you
>>>> help.
>>>>
>>>> Olivier
>>>>
>>>> PS: when I run the second case in local mode (on my dev laptop), I
>>>> observe a end-2-end latency around 1 to 2 ms.
>>>>
>>>>
>>>>
>>>
>>
>

Re: ack looks long

Posted by Olivier Mallassi <ol...@gmail.com>.
and it looks like each time the emitDirect/consumeBatch method is called
(to send an ack to mastercoord), I could expect a few ms wait

2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task Thread-9-b-1
[INFO] Emitting: b-1 __ack_ack [4382299576881565710 1329423615189887198]
 Caller+0        at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1         at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3         at
backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:152)
Caller+4         at
backtype.storm.daemon.task$send_unanchored.invoke(task.clj:111)
Caller+5         at
backtype.storm.daemon.task$send_unanchored.invoke(task.clj:117)
Caller+6         at
backtype.storm.daemon.executor$fn__4722$fn$reify__4767.ack(executor.clj:707)
Caller+7         at backtype.storm.task.OutputCollector.*ack*
(OutputCollector.java:213)
2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.executor
Thread-5-__acker [INFO] Processing received message source: b-1:5, stream:
__ack_ack, id: {}, [4382299576881565710 1329423615189887198]
 Caller+0        at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1         at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3         at
backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
Caller+4         at
backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
Caller+5         at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
Caller+6         at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
Caller+7         at backtype.storm.disruptor$*consume_batch_when_available*
.invoke(disruptor.clj:80)
2015-07-02T16:10:00.*128+0200* backtype.storm.daemon.task *Thread-5-__acker*
[INFO] Emitting direct: 1; __acker __ack_ack [4382299576881565710]
 Caller+0        at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1         at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3         at
backtype.storm.daemon.task$mk_tasks_fn$fn__3639.invoke(task.clj:134)
Caller+4         at
backtype.storm.daemon.executor$fn__4722$fn__4734$bolt_emit__4761.invoke(executor.clj:662)
Caller+5         at
backtype.storm.daemon.executor$fn__4722$fn$reify__4767.emitDirect(executor.clj:700)
Caller+6         at
backtype.storm.task.OutputCollector.emitDirect(OutputCollector.java:208)
Caller+7         at backtype.storm.task.OutputCollector.*emitDirect*
(OutputCollector.java:135)
2015-07-02T16:10:00.*133+0200* backtype.storm.daemon.executor
*Thread-15-$mastercoord-bg0* [INFO] Processing received message source:
__acker:3, stream: __ack_ack, id: {}, [4382299576881565710]
 Caller+0        at
clojure.tools.logging$eval1$fn__7.invoke(NO_SOURCE_FILE:0)
Caller+1         at
clojure.tools.logging.impl$fn__15$G__8__26.invoke(impl.clj:16)
Caller+2         at clojure.tools.logging$log_STAR_.invoke(logging.clj:59)
Caller+3         at
backtype.storm.daemon.executor$mk_task_receiver$fn__4645.invoke(executor.clj:399)
Caller+4         at
backtype.storm.disruptor$clojure_handler$reify__1446.onEvent(disruptor.clj:58)
Caller+5         at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
Caller+6         at
backtype.storm.utils.DisruptorQueue.consumeBatch(DisruptorQueue.java:87)
Caller+7         at backtype.storm.disruptor$*consume_batch*
.invoke(disruptor.clj:76)


Is it something someone has observed on Linux platform? I am on Solaris.

On Thu, Jul 2, 2015 at 12:04 PM, Olivier Mallassi <
olivier.mallassi@gmail.com> wrote:

> Hi all
>
> I investigated in more details (1 event per sec, emitTimeInterval 1sec)
> and it appears most of my 9ms are spent during exchange with _acker (stream
> __ack_ack).
>
> Regarding the following traces and if I understood correctly, I spend 7, 8
> ms between the "Emitting direct __acker _ack_ack" and "Processing"
>
> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> b-0 __ack_ack [671062448715243437 2548740402887507265]
> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: b-0:4, stream: __ack_ack, id: {},
> [671062448715243437 2548740402887507265]
> 2015-07-02T08:57:49.311+0200 backtype.storm.daemon.task 0 [INFO] Emitting
> direct: 1; __acker __ack_ack [671062448715243437]
> ******************************************
> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: __acker:3, stream: __ack_ack, id: {},
> [671062448715243437]
> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.executor 0 [INFO]
> Acking message 175873:0
>
> 2015-07-02T08:57:49.318+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> $mastercoord-bg0 $commit [175873:0]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> $mastercoord-bg0 __ack_init [-4460222877314276996 -348189169909316069 1]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: $mastercoord-bg0:1, stream: $commit,
> id: {-4460222877314276996=-348189169909316069}, [175873:0]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: $mastercoord-bg0:1, stream: __ack_init,
> id: {}, [-4460222877314276996 -348189169909316069 1]
> [------------------------------------------ the Aggregator is called ]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting:
> b-0 __ack_ack [-4460222877314276996 -348189169909316069]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: b-0:4, stream: __ack_ack, id: {},
> [-4460222877314276996 -348189169909316069]
> 2015-07-02T08:57:49.319+0200 backtype.storm.daemon.task 0 [INFO] Emitting
> direct: 1; __acker __ack_ack [-4460222877314276996]
> ******************************************
> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
> Processing received message source: __acker:3, stream: __ack_ack, id: {},
> [-4460222877314276996]
> 2015-07-02T08:57:49.329+0200 backtype.storm.daemon.executor 0 [INFO]
> Acking message 175873:0
>
>
> This time is systematic for all tuples (and not related to gc pauses...).
> Once again, when running the same topology (code + config) in local mode, I
> do not observe the same kind of wait.
>
> has anyone an idea of what's going on? is there a particular code section
> that I can look at ?
>
> thx.
>
> olivier.
>
>
>
> On Tue, Jun 30, 2015 at 5:37 PM, Olivier Mallassi <
> olivier.mallassi@gmail.com> wrote:
>
>> Hi all
>>
>> Here is the code of my topology
>>
>> topology.newStream(this.getTopologyName(), spout)
>>                 .each(new Fields("event"), new
>> EventTypeFilter(MyEvent.class))
>>                 .each(new Fields("event"), flattern, new Fields( "a",
>>  "b", "c"))
>>                 .parallelismHint(1)
>>                 .groupBy(new Fields("a", "b", "c"))
>>                 .persistentAggregate(new MemoryMapState.StateFactory<>(),
>>                         new Fields("event"),
>>                         new EventAggregator(),
>>
>>                         new Fields("aggr"))
>>                 .parallelismHint(1);
>>
>> I ran the topology with no parallelism, a troughtput of 20 events per sec
>> to avoid any types of contention...
>>
>>
>> By adding some traces, it looks like I spend ~8 ms between the end of the
>> "flattern bolt" and the getAll() method implemented in the MemoryMapState.
>>
>> Looks weird but is it the expected behavior? Do you have any ideas of
>> what can cause this delay (which is regular as a metronome) ?
>>
>> I am running storm 0.9.4.
>>
>> Regards.
>>
>> olivier.
>>
>>
>>
>>
>> On Mon, Jun 29, 2015 at 6:41 PM, Olivier Mallassi <
>> olivier.mallassi@gmail.com> wrote:
>>
>>> Hello all
>>>
>>> I am facing an issue or at least something I cannot figure out regarding
>>> the end-to-end latency of my topology.
>>> So I guess you will be able to help me.
>>>
>>> *Topology *
>>> I am running a trident topology
>>> - a IBatchSpout emits events of different types (1 spout only)
>>> - a filter is applied and check the "instance of" the events. In most of
>>> the case, this filter returns true. (b-1)
>>> - A bolt (b-0) flattern the event
>>> - a group by 3 fields of the event is done
>>> - the events are aggregated using the persistentAggregate methods, a
>>> custom reducer and a simple MemoryMapState object
>>>
>>> Due to my current data set, The events are grouped into 19 different
>>> aggregates.
>>>
>>> In these cases, I run
>>> - a single worker with a single spout and multiple executors (10)
>>> - emit interval millis = 1
>>> - TOPOLOGY_EXECUTOR_RECEIVER / SEND_BUFFER SIZE are set to a large value
>>> 16384.
>>>
>>> I measure my end-to-end latency by adding a timestamp in the event, just
>>> before the _collector.emit() and then, in the reducer, by measuring the
>>> delta. So this latency is the time an event needs to be aggregated.
>>>
>>> Here are the numbers I got
>>>
>>>   case#1 e*vents throughput*  *40000*                   capacity Execute
>>> latency (ms) process latency (ms) complete latency(ms)  mastercoord-bg0
>>>       10.031  $spoutcoord-spout0 0.022 0.243 0.181    __acker 0.004
>>> 0.004 0.002    b-0 0.045 0.006 0.002    b-1 0.159 0.004 0.003    spout0
>>> 0.150 1.673 1.919    *measured end2end latency (99%centile) *
>>> *9.5*             *case #2 events throughput* *250*
>>> mastercoord-bg0       10.026  $spoutcoord-spout0 0.024 0.255 0.190
>>> __acker 0.006 0.005 0.002    b-0 0.006 0.020 0.024    b-1 0.004 0.018
>>> 0.016    spout0 0.017 0.185 0.135    *measured end2end latency
>>> (99%centile)*       *9.5*
>>>
>>>
>>> The good news is that the end-to-end latency is quite stable where the
>>> throughput of generated events has significantly increase.
>>> The 'not so good' new is that I would like my latency to be under 1ms
>>> (again, a single worker and at least for low throughput). Do you have any
>>> ideas about how to configure the topology? maybe avoid using Trident and
>>> use Storm API directly can help?
>>>
>>> Do you know how I can better understand where I spend my time in this
>>> topology?
>>>
>>> Note that I have tried to increase the max.spout.pending but the
>>> end-2-end latency is worst (like if everything was, in the end, mono
>>> threaded)
>>>
>>> For sure, there is something I do not understand. Many thx for you help.
>>>
>>> Olivier
>>>
>>> PS: when I run the second case in local mode (on my dev laptop), I
>>> observe a end-2-end latency around 1 to 2 ms.
>>>
>>>
>>>
>>
>