You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by 林海涛(信息技术部交易云技术研发组) <li...@gf.com.cn> on 2016/05/31 10:47:22 UTC

How to improve the intercommunication latency of spout/bolt

Hello.
I do test with a simple topology to test the intercommunication latency of spout/bolt. It’s just emit the current nano timestamp from a spout and print the time difference when a bolt receive it.
I deploy my storm cluster in my own machine with docker container (one nimbus, one supervisor), and run the topology in cluster mode.
code as below:


public class RandomSpout extends BaseRichSpout{

SpoutOutputCollector _collector;


  public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

    _collector = collector;

  }



  public void nextTuple() {

    Utils.sleep(1000);

long currentTime = System.nanoTime();

    _collector.emit(new Values(currentTime));



  }


public void declareOutputFields(OutputFieldsDeclarer arg0) {

// TODO Auto-generated method stub

arg0.declare(new Fields("value"));

}

}


public class PrintBolt extends BaseRichBolt{

private LogFileWriter _logFile;


public void execute(Tuple arg0) {

// TODO Auto-generated method stub

long prevTime = arg0.getLong(0);

long currentTime = System.nanoTime();

_logFile.writeLog("cost: " + (currentTime - prevTime));

}


public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {

// TODO Auto-generated method stub

try {

_logFile = new LogFileWriter("StormTest”, this.getClass().getSimpleName());

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}


public void declareOutputFields(OutputFieldsDeclarer arg0) {

// TODO Auto-generated method stub

arg0.declare(new Fields("value"));

}


}


public class Topology

{

    public static void main( String[] args )

    {

    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("spout", new RandomSpout(), 1);

    builder.setBolt("bolt", new PrintBolt(), 1).shuffleGrouping("spout");



    Config conf = new Config();

    conf.setDebug(false);

    if(args.length > 0){

            // cluster submit.

    conf.setNumWorkers(2);

        conf.setNumAckers(0);

            try {

                StormSubmitter.submitTopology("stormTest", conf, builder.createTopology());

            } catch (Exception e) {

e.printStackTrace();

    }

        }else{

            new LocalCluster().submitTopology("stormTest", conf, builder.createTopology());

        }

    }

}


Output is below:

[2016-05-31 09:13:53]cost: 1960336
[2016-05-31 09:13:54]cost: 2600239
[2016-05-31 09:13:55]cost: 3103449
[2016-05-31 09:13:56]cost: 3206544
[2016-05-31 09:13:57]cost: 3783647
[2016-05-31 09:13:58]cost: 3635923
[2016-05-31 09:13:59]cost: 3887787
[2016-05-31 09:14:00]cost: 1623692
[2016-05-31 09:14:01]cost: 2524674
[2016-05-31 09:14:02]cost: 3383506
[2016-05-31 09:14:03]cost: 3898478
[2016-05-31 09:14:04]cost: 2120949
[2016-05-31 09:14:05]cost: 3756272
[2016-05-31 09:14:06]cost: 2877997
[2016-05-31 09:14:07]cost: 3432532
[2016-05-31 09:14:08]cost: 3638306
[2016-05-31 09:14:09]cost: 2958907
[2016-05-31 09:14:10]cost: 2742666
[2016-05-31 09:14:11]cost: 3024576
[2016-05-31 09:14:12]cost: 2822562
[2016-05-31 09:14:13]cost: 2623060
[2016-05-31 09:14:14]cost: 4045938


Obviously, there is a 2ms latency approximately. It seems not good for me. How can I reduce the latency?

Re: How to improve the intercommunication latency of spout/bolt

Posted by Kevin Conaway <ke...@gmail.com>.
Try using localOrShuffle grouping.  Storm will attempt to pass messages
directly to the next component within the same JVM, if possible

On Tuesday, May 31, 2016, 林海涛(信息技术部交易云技术研发组) <li...@gf.com.cn> wrote:

> Hello.
> I do test with a simple topology to test the intercommunication latency of
> spout/bolt. It’s just emit the current nano timestamp from a spout and
> print the time difference when a bolt receive it.
> I deploy my storm cluster in my own machine with docker container (one
> nimbus, one supervisor), and run the topology in cluster mode.
> code as below:
>
> public class RandomSpout extends BaseRichSpout{
>
> SpoutOutputCollector _collector;
>
>
>   public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
>
>     _collector = collector;
>
>   }
>
>
>
>   public void nextTuple() {
>
>     Utils.sleep(1000);
>
> long currentTime = System.nanoTime();
>
>     _collector.emit(new Values(currentTime));
>
>
>
>   }
>
>
> public void declareOutputFields(OutputFieldsDeclarer arg0) {
>
> // TODO Auto-generated method stub
>
> arg0.declare(new Fields("value"));
>
> }
>
> }
>
>
> public class PrintBolt extends BaseRichBolt{
>
> private LogFileWriter _logFile;
>
>
> public void execute(Tuple arg0) {
>
> // TODO Auto-generated method stub
>
> long prevTime = arg0.getLong(0);
>
> long currentTime = System.nanoTime();
>
> _logFile.writeLog("cost: " + (currentTime - prevTime));
>
> }
>
>
> public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2)
> {
>
> // TODO Auto-generated method stub
>
> try {
>
> _logFile = new LogFileWriter("StormTest”, this
> .getClass().getSimpleName());
>
> } catch (Exception e) {
>
> // TODO Auto-generated catch block
>
> e.printStackTrace();
>
> }
>
> }
>
>
> public void declareOutputFields(OutputFieldsDeclarer arg0) {
>
> // TODO Auto-generated method stub
>
> arg0.declare(new Fields("value"));
>
> }
>
>
> }
>
>
> public class Topology
>
> {
>
>     public static void main( String[] args )
>
>     {
>
>     TopologyBuilder builder = new TopologyBuilder();
>
>     builder.setSpout("spout", new RandomSpout(), 1);
>
>     builder.setBolt("bolt", new PrintBolt(), 1).shuffleGrouping("spout");
>
>
>
>     Config conf = new Config();
>
>     conf.setDebug(false);
>
>     if(args.length > 0){
>
>             // cluster submit.
>
>     conf.setNumWorkers(2);
>
>         conf.setNumAckers(0);
>
>             try {
>
>                 StormSubmitter.submitTopology("stormTest", conf, builder
> .createTopology());
>
>             } catch (Exception e) {
>
> e.printStackTrace();
>
>     }
>
>         }else{
>
>             new LocalCluster().submitTopology("stormTest", conf, builder
> .createTopology());
>
>         }
>
>     }
>
> }
>
>
> Output is below:
>
> [2016-05-31 09:13:53]cost: 1960336
> [2016-05-31 09:13:54]cost: 2600239
> [2016-05-31 09:13:55]cost: 3103449
> [2016-05-31 09:13:56]cost: 3206544
> [2016-05-31 09:13:57]cost: 3783647
> [2016-05-31 09:13:58]cost: 3635923
> [2016-05-31 09:13:59]cost: 3887787
> [2016-05-31 09:14:00]cost: 1623692
> [2016-05-31 09:14:01]cost: 2524674
> [2016-05-31 09:14:02]cost: 3383506
> [2016-05-31 09:14:03]cost: 3898478
> [2016-05-31 09:14:04]cost: 2120949
> [2016-05-31 09:14:05]cost: 3756272
> [2016-05-31 09:14:06]cost: 2877997
> [2016-05-31 09:14:07]cost: 3432532
> [2016-05-31 09:14:08]cost: 3638306
> [2016-05-31 09:14:09]cost: 2958907
> [2016-05-31 09:14:10]cost: 2742666
> [2016-05-31 09:14:11]cost: 3024576
> [2016-05-31 09:14:12]cost: 2822562
> [2016-05-31 09:14:13]cost: 2623060
> [2016-05-31 09:14:14]cost: 4045938
>
> Obviously, there is a 2ms latency approximately. It seems not good for me.
> How can I reduce the latency?
>


-- 
Kevin Conaway
http://www.linkedin.com/pub/kevin-conaway/7/107/580/
https://github.com/kevinconaway