You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by zz <ar...@126.com> on 2018/06/26 02:29:23 UTC

performance problem(storm-kafka-client 1.1.1 and 1.1.2)

hi
I am a new storm user, i have met a performance problem,please give me some suggestion.


dev environment:
server os:centos 7.5
zookeeper:3.4.10(one node)
kafka:2.11-1.1.0(one node)
storm:1.1.1 or 1.1.2
client os:win10


repro step:
1、send 1000000 messages to kafka(topic name is "test"),every message size is 500 byte.
2、run program under local mode with storm-core1.1.1 and storm-kafka-client1.1.1, it only spent about 20s to consume 1000000 messages.
3、same program, same settings(server and client) run program under local mode with storm-core1.1.2 and storm-kafka-client1.1.2, it spent about 15 minutes to consume 1000000 messages.


I read some code of storm-kafka-client1.1.1 and storm-kafka-client1.1.2 ,there were lots of modification between two version, i want to know how to config storm-kafka-client for 1.1.2 or make some settings for resoving performance problem of 1.1.2?


Thank you.
PS:
1. attatchments are run logs of 1.1.1 and 1.1.2.
2. below is test code:


packagehjzh;
importjava.util.Date;
importjava.util.Map;
importjava.util.Properties;
importorg.apache.storm.Config;
importorg.apache.storm.LocalCluster;
importorg.apache.storm.StormSubmitter;
importorg.apache.storm.generated.AlreadyAliveException;
importorg.apache.storm.generated.AuthorizationException;
importorg.apache.storm.generated.InvalidTopologyException;
importorg.apache.storm.kafka.spout.KafkaSpout;
importorg.apache.storm.kafka.spout.KafkaSpoutConfig;
importorg.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
// import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
importorg.apache.storm.task.OutputCollector;
importorg.apache.storm.task.TopologyContext;
importorg.apache.storm.topology.OutputFieldsDeclarer;
importorg.apache.storm.topology.TopologyBuilder;
importorg.apache.storm.topology.base.BaseRichBolt;
importorg.apache.storm.tuple.Tuple;
/**
* Hello world!
*/
publicfinalclassTestTopology {
privateTestTopology() {
}
publicstaticclassPerfBoltextendsBaseRichBolt {


privateOutputCollector collector;
privatestaticfinallong seriesVersionUID =886149197481637894L;
privatestaticlong counter =1;
@Override
publicvoidprepare(MapstormConf, TopologyContextcontext, OutputCollectorcollector) {
this.collector = collector;
}
@Override
publicvoidexecute(Tupleinput) {
String tmp = input.getString(0);
if (counter++%100000==0) {
System.out.println(counter +":"+newDate());
}
collector.ack(input);
}
@Override
publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclare) {
}
}
publicstaticvoidmain(String[] args) {
Properties props =newProperties();
props.put("group.id", "group7");
KafkaSpoutConfig<String, String> sc = KafkaSpoutConfig.builder("192.168.128.128:9092", "test")
.setFirstPollOffsetStrategy(FirstPollOffsetStrategy.EARLIEST)
.setProp(props)
.build();
TopologyBuilder tp =newTopologyBuilder();
tp.setSpout("kafka_spout", newKafkaSpout<String, String>(sc), 1);
tp.setBolt("kafka_bolt", newPerfBolt(), 1).shuffleGrouping("kafka_spout");
Config conf =newConfig();
conf.setDebug(false);
if (args ==null|| args.length ==0) {
LocalCluster lc =newLocalCluster();
lc.submitTopology("topology", conf, tp.createTopology());
try {
Thread.sleep(600000);
} catch (InterruptedExceptione) {
e.printStackTrace();
}
lc.shutdown();
} else {
try {
StormSubmitter.submitTopology("topology", conf, tp.createTopology());
} catch (AlreadyAliveExceptione) {
e.printStackTrace();
} catch (InvalidTopologyExceptione) {
e.printStackTrace();
} catch (AuthorizationExceptione) {
e.printStackTrace();
}
}
}
}