You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by 林海涛(信息技术部交易云技术研发组) <li...@gf.com.cn> on 2016/06/01 03:05:55 UTC
Re: Re: How to improve the intercommunication latency of spout/bolt
Thank you. Is there any other optimizing method such as modifying storm config? I set the TOPOLOGY_DISRUPTOR_BATCH_SIZE and TOPOLOGY_DISRUPTOR_BATCH_TIMEOUT_MILLIS to 1, it seems better only when I set the worker number to 1.
发件人: Kevin Conaway <ke...@gmail.com>>
答复: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
日期: 2016年5月31日 星期二 下午8:53
至: "user@storm.apache.org<ma...@storm.apache.org>" <us...@storm.apache.org>>
主题: Re: How to improve the intercommunication latency of spout/bolt
Try using localOrShuffle grouping. Storm will attempt to pass messages directly to the next component within the same JVM, if possible
On Tuesday, May 31, 2016, 林海涛 <li...@gf.com.cn>> wrote:
Hello.
I do test with a simple topology to test the intercommunication latency of spout/bolt. It’s just emit the current nano timestamp from a spout and print the time difference when a bolt receive it.
I deploy my storm cluster in my own machine with docker container (one nimbus, one supervisor), and run the topology in cluster mode.
code as below:
public classRandomSpout extends BaseRichSpout{
SpoutOutputCollector _collector;
publicvoid open(Mapconf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
publicvoid nextTuple() {
Utils.sleep(1000);
longcurrentTime = System.nanoTime();
_collector.emit(new Values(currentTime));
}
publicvoid declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
arg0.declare(new Fields("value"));
}
}
public classPrintBolt extends BaseRichBolt{
private LogFileWriter _logFile;
publicvoid execute(Tuple arg0) {
// TODO Auto-generated method stub
longprevTime = arg0.getLong(0);
longcurrentTime = System.nanoTime();
_logFile.writeLog("cost: " + (currentTime - prevTime));
}
publicvoid prepare(Maparg0, TopologyContext arg1, OutputCollector arg2) {
// TODO Auto-generated method stub
try {
_logFile = new LogFileWriter("StormTest”, this.getClass().getSimpleName());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
publicvoid declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
arg0.declare(new Fields("value"));
}
}
public class Topology
{
public staticvoid main( String[] args )
{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSpout(), 1);
builder.setBolt("bolt", new PrintBolt(), 1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(false);
if(args.length > 0){
// cluster submit.
conf.setNumWorkers(2);
conf.setNumAckers(0);
try {
StormSubmitter.submitTopology("stormTest", conf, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
}else{
new LocalCluster().submitTopology("stormTest", conf, builder.createTopology());
}
}
}
Output is below:
[2016-05-31 09:13:53]cost: 1960336
[2016-05-31 09:13:54]cost: 2600239
[2016-05-31 09:13:55]cost: 3103449
[2016-05-31 09:13:56]cost: 3206544
[2016-05-31 09:13:57]cost: 3783647
[2016-05-31 09:13:58]cost: 3635923
[2016-05-31 09:13:59]cost: 3887787
[2016-05-31 09:14:00]cost: 1623692
[2016-05-31 09:14:01]cost: 2524674
[2016-05-31 09:14:02]cost: 3383506
[2016-05-31 09:14:03]cost: 3898478
[2016-05-31 09:14:04]cost: 2120949
[2016-05-31 09:14:05]cost: 3756272
[2016-05-31 09:14:06]cost: 2877997
[2016-05-31 09:14:07]cost: 3432532
[2016-05-31 09:14:08]cost: 3638306
[2016-05-31 09:14:09]cost: 2958907
[2016-05-31 09:14:10]cost: 2742666
[2016-05-31 09:14:11]cost: 3024576
[2016-05-31 09:14:12]cost: 2822562
[2016-05-31 09:14:13]cost: 2623060
[2016-05-31 09:14:14]cost: 4045938
Obviously, there is a 2ms latency approximately. It seems not good for me. How can I reduce the latency?
--
Kevin Conaway
http://www.linkedin.com/pub/kevin-conaway/7/107/580/
https://github.com/kevinconaway