You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Nick R. Katsipoulakis" <ni...@gmail.com> on 2015/09/08 21:19:34 UTC
UIs ack statistics are not updated
Hello all,
I am running a topology for bench marking my cluster. In it, I anchor
tuples and I acknowledge them for exactly-once processing and in order to
see the complete latency metric on the Storm UI. However, the "Complete
Latency" and the "Acked" metric values for my spouts remain 0 and I guess
that this translates to not being reported properly.
My Topology's code is really simple and consists of the following three
classes:
public static class TestWordSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
public void open(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
public void nextTuple() {
final String[] words = new String[] {"nathan", "mike",
"jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
Values tuple = new Values();
tuple.add((new Long(System.currentTimeMillis())).toString());
tuple.add(word);
_collector.emit(tuple);
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] schema = { "timestamp", "word" };
declarer.declare(new Fields(schema));
}
}
My intermediate bolts code is the following:
public static class IntermediateBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
Values v = new Values();
v.add(tuple.getString(0));
v.add(tuple.getString(1));
_collector.emit(tuple, v);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
String[] schema = { "timestamp", "word" };
declarer.declare(new Fields(schema));
}
}
And finally, my sink bolts (the last bolts in my topology) are the
following:
public static class SinkBolt extends BaseRichBolt {
OutputCollector _collector;
@Override
public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
_collector = collector;
}
@Override
public void execute(Tuple tuple) {
_collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
String[] schema = {"timestamp", "word"};
outputFieldsDeclarer.declare(new Fields(schema));
}
}
So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
just to measure my cluster. However, as I mentioned above, in the UI the
"Complete latency" and the "Acked" metrics are not updated for my spouts.
Am I doing something wrong? Please, pay attention that I ack a tuple only
at the SinkBolt. Is this the reason that I my metrics are not updated?
Thanks,
Nick
Re: UIs ack statistics are not updated
Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Thank you Matthias. I got it now.
By the way, I am measuring end-to-end latency for a topology with an input
rate of 4500 tuples-per-second, each tuple is about 64 bytes long and my
cluster consists of 4x m4.xlarge supervisor nodes and 3x m4.xlarge nodes
for zookeeper nodes and nimbus.
My topology consists of about 9 to 36 bolts and 4 spouts, and the deepest
pipeline a tuple has to go through is 5 bolts long. By pipeline I mean
something like spout-a -> bolt-1 -> bolt-2 -> ... -> bolt-out .
I am measuring complete-latency (through the spouts' ack() function) around
800 msec. Is this normal?
Thanks,
Nick
On Wed, Sep 9, 2015 at 11:53 AM, Matthias J. Sax <mj...@apache.org> wrote:
> There is no such thing for Bolts.
>
> The call to Spout.ack(...) happens after Storm retrieved all acks of all
> (transitively) anchored tuples.
>
> Let's say you have spout -> bolt1 -> bolt2
>
> Spout emit t1 which is processed by bolt1.
> bolt1 emits t2 (with anchor t1) and acks t1.
> => there will be no call to Spout.ack(...) yet, because t2 is pending
> (ie, was not acked).
> After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens.
>
>
> -Matthias
>
>
>
> On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote:
> > Thank you Derek for your reply.
> >
> > I figured out the error and it was in my code (i was not acking all
> > tuples properly). I have another question:
> >
> > I see that the BaseRichSpout has a callback function called ack(Object
> > msgId) which is called when a tuple gets acknowledged. Is there similar
> > functionality for Bolts? I see that the BaseRichBolt does not have one.
> >
> > Thanks,
> > Nick
> >
> > On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <derekd@yahoo-inc.com
> > <ma...@yahoo-inc.com>> wrote:
> >
> > The metrics used on the UI are aggregated in chunks.
> >
> > It could very well be that your code is working perfectly fine, and
> > there is a threshold of emits/acks/fails that needs to be met before
> > the numbers show up on the UI.
> >
> > Often I will see 0 on the UI until, for example, the number of emits
> > reaches 20. And very often the numbers will increment by 20s too.--
> > Derek
> >
> >
> > ________________________________
> > From: Nick R. Katsipoulakis <nick.katsip@gmail.com
> > <ma...@gmail.com>>
> > To: user@storm.apache.org <ma...@storm.apache.org>
> > Sent: Wednesday, September 9, 2015 7:52 AM
> > Subject: Re: UIs ack statistics are not updated
> >
> >
> >
> > Hello Javier and thank you for your reply.
> >
> > I have a question about the Tuple ids. Do they have to be unique? I
> > am asking because I have many spouts and they might emit identical
> > tuples in the topology.
> >
> > Also, do I have to ack a tuple only in the last bolt that processes
> > it, so that the tuple tree created is complete? Or, do I have to ack
> > each received tuple on every bolt?
> >
> > Thanks,
> > Nick
> >
> >
> >
> >
> > On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <jagonzal@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> > If I am reading your code correctly, it seems you're emitting from
> > the spout without id - therefore, your acking efforts are not being
> > used. You need to do something like:
> > >Object id= <anything you like>;
> > >_collector.emit(id,tuple);
> > >Regards,
> > >Javier
> > >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis"
> > <nick.katsip@gmail.com <ma...@gmail.com>> wrote:
> > >
> > >Hello all,
> > >>
> > >>
> > >>I am running a topology for bench marking my cluster. In it, I
> > anchor tuples and I acknowledge them for exactly-once processing and
> > in order to see the complete latency metric on the Storm UI.
> > However, the "Complete Latency" and the "Acked" metric values for my
> > spouts remain 0 and I guess that this translates to not being
> > reported properly.
> > >>
> > >>
> > >>My Topology's code is really simple and consists of the following
> > three classes:
> > >>
> > >>
> > >>public static class TestWordSpout extends BaseRichSpout {
> > >>
> > >>SpoutOutputCollector _collector;
> > >>
> > >>public void open(@SuppressWarnings("rawtypes") Map conf,
> > TopologyContext context, SpoutOutputCollector collector) {
> > >>_collector = collector;
> > >>}
> > >>public void nextTuple() {
> > >>final String[] words = new String[] {"nathan", "mike", "jackson",
> > "golda", "bertels"};
> > >>final Random rand = new Random();
> > >>final String word = words[rand.nextInt(words.length)];
> > >>Values tuple = new Values();
> > >>tuple.add((new Long(System.currentTimeMillis())).toString());
> > >>tuple.add(word);
> > >>_collector.emit(tuple);
> > >>}
> > >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> > >>String[] schema = { "timestamp", "word" };
> > >>declarer.declare(new Fields(schema));
> > >>}
> > >>}
> > >>My intermediate bolts code is the following:
> > >>public static class IntermediateBolt extends BaseRichBolt {
> > >>
> > >>OutputCollector _collector;
> > >>
> > >>@Override
> > >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> > TopologyContext context, OutputCollector collector) {
> > >>_collector = collector;
> > >>}
> > >>@Override
> > >>public void execute(Tuple tuple) {
> > >>Values v = new Values();
> > >>v.add(tuple.getString(0));
> > >>v.add(tuple.getString(1));
> > >>_collector.emit(tuple, v);
> > >>}
> > >>@Override
> > >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> > >>String[] schema = { "timestamp", "word" };
> > >>declarer.declare(new Fields(schema));
> > >>}
> > >>}
> > >>And finally, my sink bolts (the last bolts in my topology) are the
> > following:
> > >>
> > >>
> > >>public static class SinkBolt extends BaseRichBolt {
> > >>
> > >>OutputCollector _collector;
> > >>
> > >>@Override
> > >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> > TopologyContext context, OutputCollector collector) {
> > >>_collector = collector;
> > >>}
> > >>@Override
> > >>public void execute(Tuple tuple) {
> > >>_collector.ack(tuple);
> > >>}
> > >>@Override
> > >>public void declareOutputFields(OutputFieldsDeclarer
> > outputFieldsDeclarer) {
> > >>String[] schema = {"timestamp", "word"};
> > >>outputFieldsDeclarer.declare(new Fields(schema));
> > >>}
> > >>}
> > >>So, I just have a 3-level topology (spout, intermediate-bolt,
> > sink-bolt) just to measure my cluster. However, as I mentioned
> > above, in the UI the "Complete latency" and the "Acked" metrics are
> > not updated for my spouts. Am I doing something wrong? Please, pay
> > attention that I ack a tuple only at the SinkBolt. Is this the
> > reason that I my metrics are not updated?
> > >>
> > >>
> > >>Thanks,
> > >>Nick
> > >>
> > >>
> >
> >
> > --
> >
> > Nikolaos Romanos Katsipoulakis,
> > University of Pittsburgh, PhD candidate
> >
> >
> >
> >
> > --
> > Nikolaos Romanos Katsipoulakis,
> > University of Pittsburgh, PhD candidate
>
>
--
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate
Re: UIs ack statistics are not updated
Posted by "Matthias J. Sax" <mj...@apache.org>.
There is no such thing for Bolts.
The call to Spout.ack(...) happens after Storm retrieved all acks of all
(transitively) anchored tuples.
Let's say you have spout -> bolt1 -> bolt2
Spout emit t1 which is processed by bolt1.
bolt1 emits t2 (with anchor t1) and acks t1.
=> there will be no call to Spout.ack(...) yet, because t2 is pending
(ie, was not acked).
After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens.
-Matthias
On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote:
> Thank you Derek for your reply.
>
> I figured out the error and it was in my code (i was not acking all
> tuples properly). I have another question:
>
> I see that the BaseRichSpout has a callback function called ack(Object
> msgId) which is called when a tuple gets acknowledged. Is there similar
> functionality for Bolts? I see that the BaseRichBolt does not have one.
>
> Thanks,
> Nick
>
> On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <derekd@yahoo-inc.com
> <ma...@yahoo-inc.com>> wrote:
>
> The metrics used on the UI are aggregated in chunks.
>
> It could very well be that your code is working perfectly fine, and
> there is a threshold of emits/acks/fails that needs to be met before
> the numbers show up on the UI.
>
> Often I will see 0 on the UI until, for example, the number of emits
> reaches 20. And very often the numbers will increment by 20s too.--
> Derek
>
>
> ________________________________
> From: Nick R. Katsipoulakis <nick.katsip@gmail.com
> <ma...@gmail.com>>
> To: user@storm.apache.org <ma...@storm.apache.org>
> Sent: Wednesday, September 9, 2015 7:52 AM
> Subject: Re: UIs ack statistics are not updated
>
>
>
> Hello Javier and thank you for your reply.
>
> I have a question about the Tuple ids. Do they have to be unique? I
> am asking because I have many spouts and they might emit identical
> tuples in the topology.
>
> Also, do I have to ack a tuple only in the last bolt that processes
> it, so that the tuple tree created is complete? Or, do I have to ack
> each received tuple on every bolt?
>
> Thanks,
> Nick
>
>
>
>
> On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <jagonzal@gmail.com
> <ma...@gmail.com>> wrote:
>
> If I am reading your code correctly, it seems you're emitting from
> the spout without id - therefore, your acking efforts are not being
> used. You need to do something like:
> >Object id= <anything you like>;
> >_collector.emit(id,tuple);
> >Regards,
> >Javier
> >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis"
> <nick.katsip@gmail.com <ma...@gmail.com>> wrote:
> >
> >Hello all,
> >>
> >>
> >>I am running a topology for bench marking my cluster. In it, I
> anchor tuples and I acknowledge them for exactly-once processing and
> in order to see the complete latency metric on the Storm UI.
> However, the "Complete Latency" and the "Acked" metric values for my
> spouts remain 0 and I guess that this translates to not being
> reported properly.
> >>
> >>
> >>My Topology's code is really simple and consists of the following
> three classes:
> >>
> >>
> >>public static class TestWordSpout extends BaseRichSpout {
> >>
> >>SpoutOutputCollector _collector;
> >>
> >>public void open(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, SpoutOutputCollector collector) {
> >>_collector = collector;
> >>}
> >>public void nextTuple() {
> >>final String[] words = new String[] {"nathan", "mike", "jackson",
> "golda", "bertels"};
> >>final Random rand = new Random();
> >>final String word = words[rand.nextInt(words.length)];
> >>Values tuple = new Values();
> >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >>tuple.add(word);
> >>_collector.emit(tuple);
> >>}
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>My intermediate bolts code is the following:
> >>public static class IntermediateBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>Values v = new Values();
> >>v.add(tuple.getString(0));
> >>v.add(tuple.getString(1));
> >>_collector.emit(tuple, v);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>And finally, my sink bolts (the last bolts in my topology) are the
> following:
> >>
> >>
> >>public static class SinkBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>_collector.ack(tuple);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> >>String[] schema = {"timestamp", "word"};
> >>outputFieldsDeclarer.declare(new Fields(schema));
> >>}
> >>}
> >>So, I just have a 3-level topology (spout, intermediate-bolt,
> sink-bolt) just to measure my cluster. However, as I mentioned
> above, in the UI the "Complete latency" and the "Acked" metrics are
> not updated for my spouts. Am I doing something wrong? Please, pay
> attention that I ack a tuple only at the SinkBolt. Is this the
> reason that I my metrics are not updated?
> >>
> >>
> >>Thanks,
> >>Nick
> >>
> >>
>
>
> --
>
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>
>
>
>
> --
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
Re: UIs ack statistics are not updated
Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Thank you Derek for your reply.
I figured out the error and it was in my code (i was not acking all tuples
properly). I have another question:
I see that the BaseRichSpout has a callback function called ack(Object
msgId) which is called when a tuple gets acknowledged. Is there similar
functionality for Bolts? I see that the BaseRichBolt does not have one.
Thanks,
Nick
On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <de...@yahoo-inc.com> wrote:
> The metrics used on the UI are aggregated in chunks.
>
> It could very well be that your code is working perfectly fine, and there
> is a threshold of emits/acks/fails that needs to be met before the numbers
> show up on the UI.
>
> Often I will see 0 on the UI until, for example, the number of emits
> reaches 20. And very often the numbers will increment by 20s too.--
> Derek
>
>
> ________________________________
> From: Nick R. Katsipoulakis <ni...@gmail.com>
> To: user@storm.apache.org
> Sent: Wednesday, September 9, 2015 7:52 AM
> Subject: Re: UIs ack statistics are not updated
>
>
>
> Hello Javier and thank you for your reply.
>
> I have a question about the Tuple ids. Do they have to be unique? I am
> asking because I have many spouts and they might emit identical tuples in
> the topology.
>
> Also, do I have to ack a tuple only in the last bolt that processes it, so
> that the tuple tree created is complete? Or, do I have to ack each received
> tuple on every bolt?
>
> Thanks,
> Nick
>
>
>
>
> On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com>
> wrote:
>
> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
> >Object id= <anything you like>;
> >_collector.emit(id,tuple);
> >Regards,
> >Javier
> >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
> wrote:
> >
> >Hello all,
> >>
> >>
> >>I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
> >>
> >>
> >>My Topology's code is really simple and consists of the following three
> classes:
> >>
> >>
> >>public static class TestWordSpout extends BaseRichSpout {
> >>
> >>SpoutOutputCollector _collector;
> >>
> >>public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext
> context, SpoutOutputCollector collector) {
> >>_collector = collector;
> >>}
> >>public void nextTuple() {
> >>final String[] words = new String[] {"nathan", "mike", "jackson",
> "golda", "bertels"};
> >>final Random rand = new Random();
> >>final String word = words[rand.nextInt(words.length)];
> >>Values tuple = new Values();
> >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >>tuple.add(word);
> >>_collector.emit(tuple);
> >>}
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>My intermediate bolts code is the following:
> >>public static class IntermediateBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>Values v = new Values();
> >>v.add(tuple.getString(0));
> >>v.add(tuple.getString(1));
> >>_collector.emit(tuple, v);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>And finally, my sink bolts (the last bolts in my topology) are the
> following:
> >>
> >>
> >>public static class SinkBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>_collector.ack(tuple);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> >>String[] schema = {"timestamp", "word"};
> >>outputFieldsDeclarer.declare(new Fields(schema));
> >>}
> >>}
> >>So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
> >>
> >>
> >>Thanks,
> >>Nick
> >>
> >>
>
>
> --
>
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>
--
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate
Re: UIs ack statistics are not updated
Posted by Andrew Xor <an...@gmail.com>.
Also I think the metrics are not accurate per se in each and every tuple is
counted, but they are really close not an *exact* count.
Regards.
Kindly yours,
Andrew Grammenos
-- PGP PKey --
<https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt
On Wed, Sep 9, 2015 at 4:45 PM, Derek Dagit <de...@yahoo-inc.com> wrote:
> The metrics used on the UI are aggregated in chunks.
>
> It could very well be that your code is working perfectly fine, and there
> is a threshold of emits/acks/fails that needs to be met before the numbers
> show up on the UI.
>
> Often I will see 0 on the UI until, for example, the number of emits
> reaches 20. And very often the numbers will increment by 20s too.--
> Derek
>
>
> ________________________________
> From: Nick R. Katsipoulakis <ni...@gmail.com>
> To: user@storm.apache.org
> Sent: Wednesday, September 9, 2015 7:52 AM
> Subject: Re: UIs ack statistics are not updated
>
>
>
> Hello Javier and thank you for your reply.
>
> I have a question about the Tuple ids. Do they have to be unique? I am
> asking because I have many spouts and they might emit identical tuples in
> the topology.
>
> Also, do I have to ack a tuple only in the last bolt that processes it, so
> that the tuple tree created is complete? Or, do I have to ack each received
> tuple on every bolt?
>
> Thanks,
> Nick
>
>
>
>
> On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com>
> wrote:
>
> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
> >Object id= <anything you like>;
> >_collector.emit(id,tuple);
> >Regards,
> >Javier
> >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
> wrote:
> >
> >Hello all,
> >>
> >>
> >>I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
> >>
> >>
> >>My Topology's code is really simple and consists of the following three
> classes:
> >>
> >>
> >>public static class TestWordSpout extends BaseRichSpout {
> >>
> >>SpoutOutputCollector _collector;
> >>
> >>public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext
> context, SpoutOutputCollector collector) {
> >>_collector = collector;
> >>}
> >>public void nextTuple() {
> >>final String[] words = new String[] {"nathan", "mike", "jackson",
> "golda", "bertels"};
> >>final Random rand = new Random();
> >>final String word = words[rand.nextInt(words.length)];
> >>Values tuple = new Values();
> >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >>tuple.add(word);
> >>_collector.emit(tuple);
> >>}
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>My intermediate bolts code is the following:
> >>public static class IntermediateBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>Values v = new Values();
> >>v.add(tuple.getString(0));
> >>v.add(tuple.getString(1));
> >>_collector.emit(tuple, v);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>And finally, my sink bolts (the last bolts in my topology) are the
> following:
> >>
> >>
> >>public static class SinkBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>_collector.ack(tuple);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> >>String[] schema = {"timestamp", "word"};
> >>outputFieldsDeclarer.declare(new Fields(schema));
> >>}
> >>}
> >>So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
> >>
> >>
> >>Thanks,
> >>Nick
> >>
> >>
>
>
> --
>
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>
Re: UIs ack statistics are not updated
Posted by Derek Dagit <de...@yahoo-inc.com>.
The metrics used on the UI are aggregated in chunks.
It could very well be that your code is working perfectly fine, and there is a threshold of emits/acks/fails that needs to be met before the numbers show up on the UI.
Often I will see 0 on the UI until, for example, the number of emits reaches 20. And very often the numbers will increment by 20s too.--
Derek
________________________________
From: Nick R. Katsipoulakis <ni...@gmail.com>
To: user@storm.apache.org
Sent: Wednesday, September 9, 2015 7:52 AM
Subject: Re: UIs ack statistics are not updated
Hello Javier and thank you for your reply.
I have a question about the Tuple ids. Do they have to be unique? I am asking because I have many spouts and they might emit identical tuples in the topology.
Also, do I have to ack a tuple only in the last bolt that processes it, so that the tuple tree created is complete? Or, do I have to ack each received tuple on every bolt?
Thanks,
Nick
On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com> wrote:
If I am reading your code correctly, it seems you're emitting from the spout without id - therefore, your acking efforts are not being used. You need to do something like:
>Object id= <anything you like>;
>_collector.emit(id,tuple);
>Regards,
>Javier
>On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com> wrote:
>
>Hello all,
>>
>>
>>I am running a topology for bench marking my cluster. In it, I anchor tuples and I acknowledge them for exactly-once processing and in order to see the complete latency metric on the Storm UI. However, the "Complete Latency" and the "Acked" metric values for my spouts remain 0 and I guess that this translates to not being reported properly.
>>
>>
>>My Topology's code is really simple and consists of the following three classes:
>>
>>
>>public static class TestWordSpout extends BaseRichSpout {
>>
>>SpoutOutputCollector _collector;
>>
>>public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
>>_collector = collector;
>>}
>>public void nextTuple() {
>>final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
>>final Random rand = new Random();
>>final String word = words[rand.nextInt(words.length)];
>>Values tuple = new Values();
>>tuple.add((new Long(System.currentTimeMillis())).toString());
>>tuple.add(word);
>>_collector.emit(tuple);
>>}
>>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>String[] schema = { "timestamp", "word" };
>>declarer.declare(new Fields(schema));
>>}
>>}
>>My intermediate bolts code is the following:
>>public static class IntermediateBolt extends BaseRichBolt {
>>
>>OutputCollector _collector;
>>
>>@Override
>>public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>>_collector = collector;
>>}
>>@Override
>>public void execute(Tuple tuple) {
>>Values v = new Values();
>>v.add(tuple.getString(0));
>>v.add(tuple.getString(1));
>>_collector.emit(tuple, v);
>>}
>>@Override
>>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>String[] schema = { "timestamp", "word" };
>>declarer.declare(new Fields(schema));
>>}
>>}
>>And finally, my sink bolts (the last bolts in my topology) are the following:
>>
>>
>>public static class SinkBolt extends BaseRichBolt {
>>
>>OutputCollector _collector;
>>
>>@Override
>>public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>>_collector = collector;
>>}
>>@Override
>>public void execute(Tuple tuple) {
>>_collector.ack(tuple);
>>}
>>@Override
>>public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>>String[] schema = {"timestamp", "word"};
>>outputFieldsDeclarer.declare(new Fields(schema));
>>}
>>}
>>So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt) just to measure my cluster. However, as I mentioned above, in the UI the "Complete latency" and the "Acked" metrics are not updated for my spouts. Am I doing something wrong? Please, pay attention that I ack a tuple only at the SinkBolt. Is this the reason that I my metrics are not updated?
>>
>>
>>Thanks,
>>Nick
>>
>>
--
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate
Re: UIs ack statistics are not updated
Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Hello Javier and thank you for your reply.
I have a question about the Tuple ids. Do they have to be unique? I am
asking because I have many spouts and they might emit identical tuples in
the topology.
Also, do I have to ack a tuple only in the last bolt that processes it, so
that the tuple tree created is complete? Or, do I have to ack each received
tuple on every bolt?
Thanks,
Nick
On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com> wrote:
> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
>
> Object id= <anything you like>;
> _collector.emit(id,tuple);
>
> Regards,
> Javier
> On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I am running a topology for bench marking my cluster. In it, I anchor
>> tuples and I acknowledge them for exactly-once processing and in order to
>> see the complete latency metric on the Storm UI. However, the "Complete
>> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
>> that this translates to not being reported properly.
>>
>> My Topology's code is really simple and consists of the following three
>> classes:
>>
>> public static class TestWordSpout extends BaseRichSpout {
>>
>> SpoutOutputCollector _collector;
>>
>> public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
>> _collector = collector;
>> }
>> public void nextTuple() {
>> final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
>> final Random rand = new Random();
>> final String word = words[rand.nextInt(words.length)];
>> Values tuple = new Values();
>> tuple.add((new Long(System.currentTimeMillis())).toString());
>> tuple.add(word);
>> _collector.emit(tuple);
>> }
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> String[] schema = { "timestamp", "word" };
>> declarer.declare(new Fields(schema));
>> }
>> }
>>
>> My intermediate bolts code is the following:
>>
>> public static class IntermediateBolt extends BaseRichBolt {
>>
>> OutputCollector _collector;
>>
>> @Override
>> public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>> _collector = collector;
>> }
>> @Override
>> public void execute(Tuple tuple) {
>> Values v = new Values();
>> v.add(tuple.getString(0));
>> v.add(tuple.getString(1));
>> _collector.emit(tuple, v);
>> }
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer declarer) {
>> String[] schema = { "timestamp", "word" };
>> declarer.declare(new Fields(schema));
>> }
>> }
>>
>> And finally, my sink bolts (the last bolts in my topology) are the
>> following:
>>
>> public static class SinkBolt extends BaseRichBolt {
>>
>> OutputCollector _collector;
>>
>> @Override
>> public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>> _collector = collector;
>> }
>> @Override
>> public void execute(Tuple tuple) {
>> _collector.ack(tuple);
>> }
>> @Override
>> public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>> String[] schema = {"timestamp", "word"};
>> outputFieldsDeclarer.declare(new Fields(schema));
>> }
>> }
>>
>> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
>> just to measure my cluster. However, as I mentioned above, in the UI the
>> "Complete latency" and the "Acked" metrics are not updated for my spouts.
>> Am I doing something wrong? Please, pay attention that I ack a tuple only
>> at the SinkBolt. Is this the reason that I my metrics are not updated?
>>
>> Thanks,
>> Nick
>>
>>
--
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate
Re: UIs ack statistics are not updated
Posted by Javier Gonzalez <ja...@gmail.com>.
If I am reading your code correctly, it seems you're emitting from the
spout without id - therefore, your acking efforts are not being used. You
need to do something like:
Object id= <anything you like>;
_collector.emit(id,tuple);
Regards,
Javier
On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
wrote:
> Hello all,
>
> I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
>
> My Topology's code is really simple and consists of the following three
> classes:
>
> public static class TestWordSpout extends BaseRichSpout {
>
> SpoutOutputCollector _collector;
>
> public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
> _collector = collector;
> }
> public void nextTuple() {
> final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
> final Random rand = new Random();
> final String word = words[rand.nextInt(words.length)];
> Values tuple = new Values();
> tuple.add((new Long(System.currentTimeMillis())).toString());
> tuple.add(word);
> _collector.emit(tuple);
> }
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> String[] schema = { "timestamp", "word" };
> declarer.declare(new Fields(schema));
> }
> }
>
> My intermediate bolts code is the following:
>
> public static class IntermediateBolt extends BaseRichBolt {
>
> OutputCollector _collector;
>
> @Override
> public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
> _collector = collector;
> }
> @Override
> public void execute(Tuple tuple) {
> Values v = new Values();
> v.add(tuple.getString(0));
> v.add(tuple.getString(1));
> _collector.emit(tuple, v);
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer declarer) {
> String[] schema = { "timestamp", "word" };
> declarer.declare(new Fields(schema));
> }
> }
>
> And finally, my sink bolts (the last bolts in my topology) are the
> following:
>
> public static class SinkBolt extends BaseRichBolt {
>
> OutputCollector _collector;
>
> @Override
> public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
> _collector = collector;
> }
> @Override
> public void execute(Tuple tuple) {
> _collector.ack(tuple);
> }
> @Override
> public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
> String[] schema = {"timestamp", "word"};
> outputFieldsDeclarer.declare(new Fields(schema));
> }
> }
>
> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
>
> Thanks,
> Nick
>
>