You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Nick R. Katsipoulakis" <ni...@gmail.com> on 2015/09/08 21:19:34 UTC

UIs ack statistics are not updated

Hello all,

I am running a topology for bench marking my cluster. In it, I anchor
tuples and I acknowledge them for exactly-once processing and in order to
see the complete latency metric on the Storm UI. However, the "Complete
Latency" and the "Acked" metric values for my spouts remain 0 and I guess
that this translates to not being reported properly.

My Topology's code is really simple and consists of the following three
classes:

public static class TestWordSpout extends BaseRichSpout {

   SpoutOutputCollector _collector;

   public void open(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, SpoutOutputCollector collector) {
      _collector = collector;
   }
   public void nextTuple() {
      final String[] words = new String[] {"nathan", "mike",
"jackson", "golda", "bertels"};
      final Random rand = new Random();
      final String word = words[rand.nextInt(words.length)];
      Values tuple = new Values();
      tuple.add((new Long(System.currentTimeMillis())).toString());
      tuple.add(word);
      _collector.emit(tuple);
   }
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
      String[] schema = { "timestamp", "word" };
      declarer.declare(new Fields(schema));
   }
}

My intermediate bolts code is the following:

public static class IntermediateBolt extends BaseRichBolt {

      OutputCollector _collector;

      @Override
      public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
         _collector = collector;
      }
      @Override
      public void execute(Tuple tuple) {
         Values v = new Values();
         v.add(tuple.getString(0));
         v.add(tuple.getString(1));
         _collector.emit(tuple, v);
      }
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
         String[] schema = { "timestamp", "word" };
         declarer.declare(new Fields(schema));
      }
   }

And finally, my sink bolts (the last bolts in my topology) are the
following:

public static class SinkBolt extends BaseRichBolt {

   OutputCollector _collector;

   @Override
   public void prepare(@SuppressWarnings("rawtypes") Map conf,
TopologyContext context, OutputCollector collector) {
      _collector = collector;
   }
   @Override
   public void execute(Tuple tuple) {
         _collector.ack(tuple);
   }
   @Override
   public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
      String[] schema = {"timestamp", "word"};
      outputFieldsDeclarer.declare(new Fields(schema));
   }
}

So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
just to measure my cluster. However, as I mentioned above, in the UI the
"Complete latency" and the "Acked" metrics are not updated for my spouts.
Am I doing something wrong? Please, pay attention that I ack a tuple only
at the SinkBolt. Is this the reason that I my metrics are not updated?

Thanks,
Nick

Re: UIs ack statistics are not updated

Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Thank you Matthias. I got it now.

By the way, I am measuring end-to-end latency for a topology with an input
rate of 4500 tuples-per-second, each tuple is about 64 bytes long and my
cluster consists of 4x m4.xlarge supervisor nodes and 3x m4.xlarge nodes
for zookeeper nodes and nimbus.

My topology consists of about 9 to 36 bolts and 4 spouts, and the deepest
pipeline a tuple has to go through is 5 bolts long. By pipeline I mean
something like spout-a -> bolt-1 -> bolt-2 -> ... -> bolt-out .

I am measuring complete-latency (through the spouts' ack() function) around
800 msec. Is this normal?

Thanks,
Nick

On Wed, Sep 9, 2015 at 11:53 AM, Matthias J. Sax <mj...@apache.org> wrote:

> There is no such thing for Bolts.
>
> The call to Spout.ack(...) happens after Storm retrieved all acks of all
> (transitively) anchored tuples.
>
> Let's say you have spout -> bolt1 -> bolt2
>
> Spout emit t1 which is processed by bolt1.
> bolt1 emits t2 (with anchor t1) and acks t1.
> => there will be no call to Spout.ack(...) yet, because t2 is pending
> (ie, was not acked).
> After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens.
>
>
> -Matthias
>
>
>
> On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote:
> > Thank you Derek for your reply.
> >
> > I figured out the error and it was in my code (i was not acking all
> > tuples properly). I have another question:
> >
> > I see that the BaseRichSpout has a callback function called ack(Object
> > msgId) which is called when a tuple gets acknowledged. Is there similar
> > functionality for Bolts? I see that the BaseRichBolt does not have one.
> >
> > Thanks,
> > Nick
> >
> > On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <derekd@yahoo-inc.com
> > <ma...@yahoo-inc.com>> wrote:
> >
> >     The metrics used on the UI are aggregated in chunks.
> >
> >     It could very well be that your code is working perfectly fine, and
> >     there is a threshold of emits/acks/fails that needs to be met before
> >     the numbers show up on the UI.
> >
> >     Often I will see 0 on the UI until, for example, the number of emits
> >     reaches 20.  And very often the numbers will increment by 20s too.--
> >     Derek
> >
> >
> >     ________________________________
> >     From: Nick R. Katsipoulakis <nick.katsip@gmail.com
> >     <ma...@gmail.com>>
> >     To: user@storm.apache.org <ma...@storm.apache.org>
> >     Sent: Wednesday, September 9, 2015 7:52 AM
> >     Subject: Re: UIs ack statistics are not updated
> >
> >
> >
> >     Hello Javier and thank you for your reply.
> >
> >     I have a question about the Tuple ids. Do they have to be unique? I
> >     am asking because I have many spouts and they might emit identical
> >     tuples in the topology.
> >
> >     Also, do I have to ack a tuple only in the last bolt that processes
> >     it, so that the tuple tree created is complete? Or, do I have to ack
> >     each received tuple on every bolt?
> >
> >     Thanks,
> >     Nick
> >
> >
> >
> >
> >     On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <jagonzal@gmail.com
> >     <ma...@gmail.com>> wrote:
> >
> >     If I am reading your code correctly, it seems you're emitting from
> >     the spout without id - therefore, your acking efforts are not being
> >     used. You need to do something like:
> >     >Object id= <anything you like>;
> >     >_collector.emit(id,tuple);
> >     >Regards,
> >     >Javier
> >     >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis"
> >     <nick.katsip@gmail.com <ma...@gmail.com>> wrote:
> >     >
> >     >Hello all,
> >     >>
> >     >>
> >     >>I am running a topology for bench marking my cluster. In it, I
> >     anchor tuples and I acknowledge them for exactly-once processing and
> >     in order to see the complete latency metric on the Storm UI.
> >     However, the "Complete Latency" and the "Acked" metric values for my
> >     spouts remain 0 and I guess that this translates to not being
> >     reported properly.
> >     >>
> >     >>
> >     >>My Topology's code is really simple and consists of the following
> >     three classes:
> >     >>
> >     >>
> >     >>public static class TestWordSpout extends BaseRichSpout {
> >     >>
> >     >>SpoutOutputCollector _collector;
> >     >>
> >     >>public void open(@SuppressWarnings("rawtypes") Map conf,
> >     TopologyContext context, SpoutOutputCollector collector) {
> >     >>_collector = collector;
> >     >>}
> >     >>public void nextTuple() {
> >     >>final String[] words = new String[] {"nathan", "mike", "jackson",
> >     "golda", "bertels"};
> >     >>final Random rand = new Random();
> >     >>final String word = words[rand.nextInt(words.length)];
> >     >>Values tuple = new Values();
> >     >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >     >>tuple.add(word);
> >     >>_collector.emit(tuple);
> >     >>}
> >     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >     >>String[] schema = { "timestamp", "word" };
> >     >>declarer.declare(new Fields(schema));
> >     >>}
> >     >>}
> >     >>My intermediate bolts code is the following:
> >     >>public static class IntermediateBolt extends BaseRichBolt {
> >     >>
> >     >>OutputCollector _collector;
> >     >>
> >     >>@Override
> >     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> >     TopologyContext context, OutputCollector collector) {
> >     >>_collector = collector;
> >     >>}
> >     >>@Override
> >     >>public void execute(Tuple tuple) {
> >     >>Values v = new Values();
> >     >>v.add(tuple.getString(0));
> >     >>v.add(tuple.getString(1));
> >     >>_collector.emit(tuple, v);
> >     >>}
> >     >>@Override
> >     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >     >>String[] schema = { "timestamp", "word" };
> >     >>declarer.declare(new Fields(schema));
> >     >>}
> >     >>}
> >     >>And finally, my sink bolts (the last bolts in my topology) are the
> >     following:
> >     >>
> >     >>
> >     >>public static class SinkBolt extends BaseRichBolt {
> >     >>
> >     >>OutputCollector _collector;
> >     >>
> >     >>@Override
> >     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> >     TopologyContext context, OutputCollector collector) {
> >     >>_collector = collector;
> >     >>}
> >     >>@Override
> >     >>public void execute(Tuple tuple) {
> >     >>_collector.ack(tuple);
> >     >>}
> >     >>@Override
> >     >>public void declareOutputFields(OutputFieldsDeclarer
> >     outputFieldsDeclarer) {
> >     >>String[] schema = {"timestamp", "word"};
> >     >>outputFieldsDeclarer.declare(new Fields(schema));
> >     >>}
> >     >>}
> >     >>So, I just have a 3-level topology (spout, intermediate-bolt,
> >     sink-bolt) just to measure my cluster. However, as I mentioned
> >     above, in the UI the "Complete latency" and the "Acked" metrics are
> >     not updated for my spouts. Am I doing something wrong? Please, pay
> >     attention that I ack a tuple only at the SinkBolt. Is this the
> >     reason that I my metrics are not updated?
> >     >>
> >     >>
> >     >>Thanks,
> >     >>Nick
> >     >>
> >     >>
> >
> >
> >     --
> >
> >     Nikolaos Romanos Katsipoulakis,
> >     University of Pittsburgh, PhD candidate
> >
> >
> >
> >
> > --
> > Nikolaos Romanos Katsipoulakis,
> > University of Pittsburgh, PhD candidate
>
>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: UIs ack statistics are not updated

Posted by "Matthias J. Sax" <mj...@apache.org>.
There is no such thing for Bolts.

The call to Spout.ack(...) happens after Storm retrieved all acks of all
(transitively) anchored tuples.

Let's say you have spout -> bolt1 -> bolt2

Spout emit t1 which is processed by bolt1.
bolt1 emits t2 (with anchor t1) and acks t1.
=> there will be no call to Spout.ack(...) yet, because t2 is pending
(ie, was not acked).
After bolt2 processed and acked t2, the callback to Spout.ack(t1) happens.


-Matthias



On 09/09/2015 03:55 PM, Nick R. Katsipoulakis wrote:
> Thank you Derek for your reply. 
> 
> I figured out the error and it was in my code (i was not acking all
> tuples properly). I have another question:
> 
> I see that the BaseRichSpout has a callback function called ack(Object
> msgId) which is called when a tuple gets acknowledged. Is there similar
> functionality for Bolts? I see that the BaseRichBolt does not have one.
> 
> Thanks,
> Nick
> 
> On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <derekd@yahoo-inc.com
> <ma...@yahoo-inc.com>> wrote:
> 
>     The metrics used on the UI are aggregated in chunks.
> 
>     It could very well be that your code is working perfectly fine, and
>     there is a threshold of emits/acks/fails that needs to be met before
>     the numbers show up on the UI.
> 
>     Often I will see 0 on the UI until, for example, the number of emits
>     reaches 20.  And very often the numbers will increment by 20s too.--
>     Derek
> 
> 
>     ________________________________
>     From: Nick R. Katsipoulakis <nick.katsip@gmail.com
>     <ma...@gmail.com>>
>     To: user@storm.apache.org <ma...@storm.apache.org>
>     Sent: Wednesday, September 9, 2015 7:52 AM
>     Subject: Re: UIs ack statistics are not updated
> 
> 
> 
>     Hello Javier and thank you for your reply.
> 
>     I have a question about the Tuple ids. Do they have to be unique? I
>     am asking because I have many spouts and they might emit identical
>     tuples in the topology.
> 
>     Also, do I have to ack a tuple only in the last bolt that processes
>     it, so that the tuple tree created is complete? Or, do I have to ack
>     each received tuple on every bolt?
> 
>     Thanks,
>     Nick
> 
> 
> 
> 
>     On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <jagonzal@gmail.com
>     <ma...@gmail.com>> wrote:
> 
>     If I am reading your code correctly, it seems you're emitting from
>     the spout without id - therefore, your acking efforts are not being
>     used. You need to do something like:
>     >Object id= <anything you like>;
>     >_collector.emit(id,tuple);
>     >Regards,
>     >Javier
>     >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis"
>     <nick.katsip@gmail.com <ma...@gmail.com>> wrote:
>     >
>     >Hello all,
>     >>
>     >>
>     >>I am running a topology for bench marking my cluster. In it, I
>     anchor tuples and I acknowledge them for exactly-once processing and
>     in order to see the complete latency metric on the Storm UI.
>     However, the "Complete Latency" and the "Acked" metric values for my
>     spouts remain 0 and I guess that this translates to not being
>     reported properly.
>     >>
>     >>
>     >>My Topology's code is really simple and consists of the following
>     three classes:
>     >>
>     >>
>     >>public static class TestWordSpout extends BaseRichSpout {
>     >>
>     >>SpoutOutputCollector _collector;
>     >>
>     >>public void open(@SuppressWarnings("rawtypes") Map conf,
>     TopologyContext context, SpoutOutputCollector collector) {
>     >>_collector = collector;
>     >>}
>     >>public void nextTuple() {
>     >>final String[] words = new String[] {"nathan", "mike", "jackson",
>     "golda", "bertels"};
>     >>final Random rand = new Random();
>     >>final String word = words[rand.nextInt(words.length)];
>     >>Values tuple = new Values();
>     >>tuple.add((new Long(System.currentTimeMillis())).toString());
>     >>tuple.add(word);
>     >>_collector.emit(tuple);
>     >>}
>     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     >>String[] schema = { "timestamp", "word" };
>     >>declarer.declare(new Fields(schema));
>     >>}
>     >>}
>     >>My intermediate bolts code is the following:
>     >>public static class IntermediateBolt extends BaseRichBolt {
>     >>
>     >>OutputCollector _collector;
>     >>
>     >>@Override
>     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
>     TopologyContext context, OutputCollector collector) {
>     >>_collector = collector;
>     >>}
>     >>@Override
>     >>public void execute(Tuple tuple) {
>     >>Values v = new Values();
>     >>v.add(tuple.getString(0));
>     >>v.add(tuple.getString(1));
>     >>_collector.emit(tuple, v);
>     >>}
>     >>@Override
>     >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>     >>String[] schema = { "timestamp", "word" };
>     >>declarer.declare(new Fields(schema));
>     >>}
>     >>}
>     >>And finally, my sink bolts (the last bolts in my topology) are the
>     following:
>     >>
>     >>
>     >>public static class SinkBolt extends BaseRichBolt {
>     >>
>     >>OutputCollector _collector;
>     >>
>     >>@Override
>     >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
>     TopologyContext context, OutputCollector collector) {
>     >>_collector = collector;
>     >>}
>     >>@Override
>     >>public void execute(Tuple tuple) {
>     >>_collector.ack(tuple);
>     >>}
>     >>@Override
>     >>public void declareOutputFields(OutputFieldsDeclarer
>     outputFieldsDeclarer) {
>     >>String[] schema = {"timestamp", "word"};
>     >>outputFieldsDeclarer.declare(new Fields(schema));
>     >>}
>     >>}
>     >>So, I just have a 3-level topology (spout, intermediate-bolt,
>     sink-bolt) just to measure my cluster. However, as I mentioned
>     above, in the UI the "Complete latency" and the "Acked" metrics are
>     not updated for my spouts. Am I doing something wrong? Please, pay
>     attention that I ack a tuple only at the SinkBolt. Is this the
>     reason that I my metrics are not updated?
>     >>
>     >>
>     >>Thanks,
>     >>Nick
>     >>
>     >>
> 
> 
>     --
> 
>     Nikolaos Romanos Katsipoulakis,
>     University of Pittsburgh, PhD candidate
> 
> 
> 
> 
> -- 
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate


Re: UIs ack statistics are not updated

Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Thank you Derek for your reply.

I figured out the error and it was in my code (i was not acking all tuples
properly). I have another question:

I see that the BaseRichSpout has a callback function called ack(Object
msgId) which is called when a tuple gets acknowledged. Is there similar
functionality for Bolts? I see that the BaseRichBolt does not have one.

Thanks,
Nick

On Wed, Sep 9, 2015 at 9:45 AM, Derek Dagit <de...@yahoo-inc.com> wrote:

> The metrics used on the UI are aggregated in chunks.
>
> It could very well be that your code is working perfectly fine, and there
> is a threshold of emits/acks/fails that needs to be met before the numbers
> show up on the UI.
>
> Often I will see 0 on the UI until, for example, the number of emits
> reaches 20.  And very often the numbers will increment by 20s too.--
> Derek
>
>
> ________________________________
> From: Nick R. Katsipoulakis <ni...@gmail.com>
> To: user@storm.apache.org
> Sent: Wednesday, September 9, 2015 7:52 AM
> Subject: Re: UIs ack statistics are not updated
>
>
>
> Hello Javier and thank you for your reply.
>
> I have a question about the Tuple ids. Do they have to be unique? I am
> asking because I have many spouts and they might emit identical tuples in
> the topology.
>
> Also, do I have to ack a tuple only in the last bolt that processes it, so
> that the tuple tree created is complete? Or, do I have to ack each received
> tuple on every bolt?
>
> Thanks,
> Nick
>
>
>
>
> On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com>
> wrote:
>
> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
> >Object id= <anything you like>;
> >_collector.emit(id,tuple);
> >Regards,
> >Javier
> >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
> wrote:
> >
> >Hello all,
> >>
> >>
> >>I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
> >>
> >>
> >>My Topology's code is really simple and consists of the following three
> classes:
> >>
> >>
> >>public static class TestWordSpout extends BaseRichSpout {
> >>
> >>SpoutOutputCollector _collector;
> >>
> >>public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext
> context, SpoutOutputCollector collector) {
> >>_collector = collector;
> >>}
> >>public void nextTuple() {
> >>final String[] words = new String[] {"nathan", "mike", "jackson",
> "golda", "bertels"};
> >>final Random rand = new Random();
> >>final String word = words[rand.nextInt(words.length)];
> >>Values tuple = new Values();
> >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >>tuple.add(word);
> >>_collector.emit(tuple);
> >>}
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>My intermediate bolts code is the following:
> >>public static class IntermediateBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>Values v = new Values();
> >>v.add(tuple.getString(0));
> >>v.add(tuple.getString(1));
> >>_collector.emit(tuple, v);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>And finally, my sink bolts (the last bolts in my topology) are the
> following:
> >>
> >>
> >>public static class SinkBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>_collector.ack(tuple);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> >>String[] schema = {"timestamp", "word"};
> >>outputFieldsDeclarer.declare(new Fields(schema));
> >>}
> >>}
> >>So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
> >>
> >>
> >>Thanks,
> >>Nick
> >>
> >>
>
>
> --
>
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>



-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: UIs ack statistics are not updated

Posted by Andrew Xor <an...@gmail.com>.
Also I think the metrics are not accurate per se in each and every tuple is
counted, but they are really close not an *exact* count.

Regards.

Kindly yours,

Andrew Grammenos

-- PGP PKey --
​ <https://www.dropbox.com/s/2kcxe59zsi9nrdt/pgpsig.txt>
https://www.dropbox.com/s/yxvycjvlsc111bh/pgpsig.txt

On Wed, Sep 9, 2015 at 4:45 PM, Derek Dagit <de...@yahoo-inc.com> wrote:

> The metrics used on the UI are aggregated in chunks.
>
> It could very well be that your code is working perfectly fine, and there
> is a threshold of emits/acks/fails that needs to be met before the numbers
> show up on the UI.
>
> Often I will see 0 on the UI until, for example, the number of emits
> reaches 20.  And very often the numbers will increment by 20s too.--
> Derek
>
>
> ________________________________
> From: Nick R. Katsipoulakis <ni...@gmail.com>
> To: user@storm.apache.org
> Sent: Wednesday, September 9, 2015 7:52 AM
> Subject: Re: UIs ack statistics are not updated
>
>
>
> Hello Javier and thank you for your reply.
>
> I have a question about the Tuple ids. Do they have to be unique? I am
> asking because I have many spouts and they might emit identical tuples in
> the topology.
>
> Also, do I have to ack a tuple only in the last bolt that processes it, so
> that the tuple tree created is complete? Or, do I have to ack each received
> tuple on every bolt?
>
> Thanks,
> Nick
>
>
>
>
> On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com>
> wrote:
>
> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
> >Object id= <anything you like>;
> >_collector.emit(id,tuple);
> >Regards,
> >Javier
> >On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
> wrote:
> >
> >Hello all,
> >>
> >>
> >>I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
> >>
> >>
> >>My Topology's code is really simple and consists of the following three
> classes:
> >>
> >>
> >>public static class TestWordSpout extends BaseRichSpout {
> >>
> >>SpoutOutputCollector _collector;
> >>
> >>public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext
> context, SpoutOutputCollector collector) {
> >>_collector = collector;
> >>}
> >>public void nextTuple() {
> >>final String[] words = new String[] {"nathan", "mike", "jackson",
> "golda", "bertels"};
> >>final Random rand = new Random();
> >>final String word = words[rand.nextInt(words.length)];
> >>Values tuple = new Values();
> >>tuple.add((new Long(System.currentTimeMillis())).toString());
> >>tuple.add(word);
> >>_collector.emit(tuple);
> >>}
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>My intermediate bolts code is the following:
> >>public static class IntermediateBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>Values v = new Values();
> >>v.add(tuple.getString(0));
> >>v.add(tuple.getString(1));
> >>_collector.emit(tuple, v);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer declarer) {
> >>String[] schema = { "timestamp", "word" };
> >>declarer.declare(new Fields(schema));
> >>}
> >>}
> >>And finally, my sink bolts (the last bolts in my topology) are the
> following:
> >>
> >>
> >>public static class SinkBolt extends BaseRichBolt {
> >>
> >>OutputCollector _collector;
> >>
> >>@Override
> >>public void prepare(@SuppressWarnings("rawtypes") Map conf,
> TopologyContext context, OutputCollector collector) {
> >>_collector = collector;
> >>}
> >>@Override
> >>public void execute(Tuple tuple) {
> >>_collector.ack(tuple);
> >>}
> >>@Override
> >>public void declareOutputFields(OutputFieldsDeclarer
> outputFieldsDeclarer) {
> >>String[] schema = {"timestamp", "word"};
> >>outputFieldsDeclarer.declare(new Fields(schema));
> >>}
> >>}
> >>So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
> >>
> >>
> >>Thanks,
> >>Nick
> >>
> >>
>
>
> --
>
> Nikolaos Romanos Katsipoulakis,
> University of Pittsburgh, PhD candidate
>

Re: UIs ack statistics are not updated

Posted by Derek Dagit <de...@yahoo-inc.com>.
The metrics used on the UI are aggregated in chunks.

It could very well be that your code is working perfectly fine, and there is a threshold of emits/acks/fails that needs to be met before the numbers show up on the UI.

Often I will see 0 on the UI until, for example, the number of emits reaches 20.  And very often the numbers will increment by 20s too.-- 
Derek


________________________________
From: Nick R. Katsipoulakis <ni...@gmail.com>
To: user@storm.apache.org 
Sent: Wednesday, September 9, 2015 7:52 AM
Subject: Re: UIs ack statistics are not updated



Hello Javier and thank you for your reply.

I have a question about the Tuple ids. Do they have to be unique? I am asking because I have many spouts and they might emit identical tuples in the topology.

Also, do I have to ack a tuple only in the last bolt that processes it, so that the tuple tree created is complete? Or, do I have to ack each received tuple on every bolt?

Thanks,
Nick




On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com> wrote:

If I am reading your code correctly, it seems you're emitting from the spout without id - therefore, your acking efforts are not being used. You need to do something like:
>Object id= <anything you like>;
>_collector.emit(id,tuple);
>Regards,
>Javier
>On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com> wrote:
>
>Hello all, 
>>
>>
>>I am running a topology for bench marking my cluster. In it, I anchor tuples and I acknowledge them for exactly-once processing and in order to see the complete latency metric on the Storm UI. However, the "Complete Latency" and the "Acked" metric values for my spouts remain 0 and I guess that this translates to not being reported properly.
>>
>>
>>My Topology's code is really simple and consists of the following three classes:
>>
>>
>>public static class TestWordSpout extends BaseRichSpout {
>>
>>SpoutOutputCollector _collector;
>>
>>public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
>>_collector = collector;
>>}
>>public void nextTuple() {
>>final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
>>final Random rand = new Random();
>>final String word = words[rand.nextInt(words.length)];
>>Values tuple = new Values();
>>tuple.add((new Long(System.currentTimeMillis())).toString());
>>tuple.add(word);
>>_collector.emit(tuple);
>>}
>>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>String[] schema = { "timestamp", "word" };
>>declarer.declare(new Fields(schema));
>>}
>>}
>>My intermediate bolts code is the following:
>>public static class IntermediateBolt extends BaseRichBolt {
>>
>>OutputCollector _collector;
>>
>>@Override
>>public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>>_collector = collector;
>>}
>>@Override
>>public void execute(Tuple tuple) {
>>Values v = new Values();
>>v.add(tuple.getString(0));
>>v.add(tuple.getString(1));
>>_collector.emit(tuple, v);
>>}
>>@Override
>>public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>String[] schema = { "timestamp", "word" };
>>declarer.declare(new Fields(schema));
>>}
>>}
>>And finally, my sink bolts (the last bolts in my topology) are the following:
>>
>>
>>public static class SinkBolt extends BaseRichBolt {
>>
>>OutputCollector _collector;
>>
>>@Override
>>public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>>_collector = collector;
>>}
>>@Override
>>public void execute(Tuple tuple) {
>>_collector.ack(tuple);
>>}
>>@Override
>>public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>>String[] schema = {"timestamp", "word"};
>>outputFieldsDeclarer.declare(new Fields(schema));
>>}
>>}
>>So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt) just to measure my cluster. However, as I mentioned above, in the UI the "Complete latency" and the "Acked" metrics are not updated for my spouts. Am I doing something wrong? Please, pay attention that I ack a tuple only at the SinkBolt. Is this the reason that I my metrics are not updated?
>>
>>
>>Thanks,
>>Nick
>>
>>


-- 

Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: UIs ack statistics are not updated

Posted by "Nick R. Katsipoulakis" <ni...@gmail.com>.
Hello Javier and thank you for your reply.

I have a question about the Tuple ids. Do they have to be unique? I am
asking because I have many spouts and they might emit identical tuples in
the topology.

Also, do I have to ack a tuple only in the last bolt that processes it, so
that the tuple tree created is complete? Or, do I have to ack each received
tuple on every bolt?

Thanks,
Nick

On Wed, Sep 9, 2015 at 3:29 AM, Javier Gonzalez <ja...@gmail.com> wrote:

> If I am reading your code correctly, it seems you're emitting from the
> spout without id - therefore, your acking efforts are not being used. You
> need to do something like:
>
> Object id= <anything you like>;
> _collector.emit(id,tuple);
>
> Regards,
> Javier
> On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
> wrote:
>
>> Hello all,
>>
>> I am running a topology for bench marking my cluster. In it, I anchor
>> tuples and I acknowledge them for exactly-once processing and in order to
>> see the complete latency metric on the Storm UI. However, the "Complete
>> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
>> that this translates to not being reported properly.
>>
>> My Topology's code is really simple and consists of the following three
>> classes:
>>
>> public static class TestWordSpout extends BaseRichSpout {
>>
>>    SpoutOutputCollector _collector;
>>
>>    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
>>       _collector = collector;
>>    }
>>    public void nextTuple() {
>>       final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
>>       final Random rand = new Random();
>>       final String word = words[rand.nextInt(words.length)];
>>       Values tuple = new Values();
>>       tuple.add((new Long(System.currentTimeMillis())).toString());
>>       tuple.add(word);
>>       _collector.emit(tuple);
>>    }
>>    public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>       String[] schema = { "timestamp", "word" };
>>       declarer.declare(new Fields(schema));
>>    }
>> }
>>
>> My intermediate bolts code is the following:
>>
>> public static class IntermediateBolt extends BaseRichBolt {
>>
>>       OutputCollector _collector;
>>
>>       @Override
>>       public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>>          _collector = collector;
>>       }
>>       @Override
>>       public void execute(Tuple tuple) {
>>          Values v = new Values();
>>          v.add(tuple.getString(0));
>>          v.add(tuple.getString(1));
>>          _collector.emit(tuple, v);
>>       }
>>       @Override
>>       public void declareOutputFields(OutputFieldsDeclarer declarer) {
>>          String[] schema = { "timestamp", "word" };
>>          declarer.declare(new Fields(schema));
>>       }
>>    }
>>
>> And finally, my sink bolts (the last bolts in my topology) are the
>> following:
>>
>> public static class SinkBolt extends BaseRichBolt {
>>
>>    OutputCollector _collector;
>>
>>    @Override
>>    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>>       _collector = collector;
>>    }
>>    @Override
>>    public void execute(Tuple tuple) {
>>          _collector.ack(tuple);
>>    }
>>    @Override
>>    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>>       String[] schema = {"timestamp", "word"};
>>       outputFieldsDeclarer.declare(new Fields(schema));
>>    }
>> }
>>
>> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
>> just to measure my cluster. However, as I mentioned above, in the UI the
>> "Complete latency" and the "Acked" metrics are not updated for my spouts.
>> Am I doing something wrong? Please, pay attention that I ack a tuple only
>> at the SinkBolt. Is this the reason that I my metrics are not updated?
>>
>> Thanks,
>> Nick
>>
>>


-- 
Nikolaos Romanos Katsipoulakis,
University of Pittsburgh, PhD candidate

Re: UIs ack statistics are not updated

Posted by Javier Gonzalez <ja...@gmail.com>.
If I am reading your code correctly, it seems you're emitting from the
spout without id - therefore, your acking efforts are not being used. You
need to do something like:

Object id= <anything you like>;
_collector.emit(id,tuple);

Regards,
Javier
On Sep 8, 2015 3:19 PM, "Nick R. Katsipoulakis" <ni...@gmail.com>
wrote:

> Hello all,
>
> I am running a topology for bench marking my cluster. In it, I anchor
> tuples and I acknowledge them for exactly-once processing and in order to
> see the complete latency metric on the Storm UI. However, the "Complete
> Latency" and the "Acked" metric values for my spouts remain 0 and I guess
> that this translates to not being reported properly.
>
> My Topology's code is really simple and consists of the following three
> classes:
>
> public static class TestWordSpout extends BaseRichSpout {
>
>    SpoutOutputCollector _collector;
>
>    public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, SpoutOutputCollector collector) {
>       _collector = collector;
>    }
>    public void nextTuple() {
>       final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
>       final Random rand = new Random();
>       final String word = words[rand.nextInt(words.length)];
>       Values tuple = new Values();
>       tuple.add((new Long(System.currentTimeMillis())).toString());
>       tuple.add(word);
>       _collector.emit(tuple);
>    }
>    public void declareOutputFields(OutputFieldsDeclarer declarer) {
>       String[] schema = { "timestamp", "word" };
>       declarer.declare(new Fields(schema));
>    }
> }
>
> My intermediate bolts code is the following:
>
> public static class IntermediateBolt extends BaseRichBolt {
>
>       OutputCollector _collector;
>
>       @Override
>       public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>          _collector = collector;
>       }
>       @Override
>       public void execute(Tuple tuple) {
>          Values v = new Values();
>          v.add(tuple.getString(0));
>          v.add(tuple.getString(1));
>          _collector.emit(tuple, v);
>       }
>       @Override
>       public void declareOutputFields(OutputFieldsDeclarer declarer) {
>          String[] schema = { "timestamp", "word" };
>          declarer.declare(new Fields(schema));
>       }
>    }
>
> And finally, my sink bolts (the last bolts in my topology) are the
> following:
>
> public static class SinkBolt extends BaseRichBolt {
>
>    OutputCollector _collector;
>
>    @Override
>    public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context, OutputCollector collector) {
>       _collector = collector;
>    }
>    @Override
>    public void execute(Tuple tuple) {
>          _collector.ack(tuple);
>    }
>    @Override
>    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>       String[] schema = {"timestamp", "word"};
>       outputFieldsDeclarer.declare(new Fields(schema));
>    }
> }
>
> So, I just have a 3-level topology (spout, intermediate-bolt, sink-bolt)
> just to measure my cluster. However, as I mentioned above, in the UI the
> "Complete latency" and the "Acked" metrics are not updated for my spouts.
> Am I doing something wrong? Please, pay attention that I ack a tuple only
> at the SinkBolt. Is this the reason that I my metrics are not updated?
>
> Thanks,
> Nick
>
>