You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by sowmiya kandasamy <> on 2014/01/22 14:26:54 UTC

topology is sometimes running and another time throws exception

Hi All,
we are running word count topology in local mode.When we use shut down method we are getting the errors like "" and entire process goes shutdown.when we use cluster.killtopology method the process cannot be terminated fully.While using both killtopology method and shutdown method sometimes topology is executed sometimes shows the error "" and entire process gets shutdown.
Can you please tell what will be the issue?

The topology is sometimes running and another time throws exception

This is our code:

    Main Program:::
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    public class TopologyMain {
    public static void main(String[] args) throws InterruptedException {
    TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout("word-reader",new WordReader());
    builder.setBolt("word-normalizer", new WordNormalizer())
    builder.setBolt("word-counter", new WordCounter(),2)
    .fieldsGrouping("word-normalizer", new Fields("word"));
    Config conf = new Config();
    //Topology run
    conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("wordcount", conf,
    Spout program::
    import java.util.Map;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    public  class  WordReader implements IRichSpout {
    private SpoutOutputCollector collector;
    Map<String, Object> count;
    private FileReader fileReader;
    private boolean completed = false;
    private TopologyContext context;
    public boolean isDistributed() {return false;}
    public void ack(Object msgId) {
    public void close() {}
    public void fail(Object msgId) {
    * The only thing that the methods will do It is emit each
    * file line
    public void nextTuple() {
    * The nextuple it is called forever, so if we have been readed the file
    * we will wait and then return
    try {
    } catch (InterruptedException e) {
    //Do nothing
    String str;
    //Open the reader
    BufferedReader reader = new BufferedReader(fileReader);
    //Read all lines
    while((str = reader.readLine()) != null){
    * By each line emmit a new value with the line as a their
    this.collector.emit(new Values(str),str);
    }catch(Exception e){
    throw new RuntimeException("Error reading tuple",e);
    completed = true;
    * We will create the file and get the collector object
    public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    try {
    this.context = context;
    this.fileReader = new FileReader(conf.get("wordsFile").toString());
    } catch (FileNotFoundException e) {
    throw new RuntimeException("Error reading file["+conf.get("wordFile")+"]");
    this.collector = collector;
    * Declare the output field "word"
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("str"));
    public void deactivate(){}
    public void activate(){}
    public Map<String, Object> getComponentConfiguration(){return count;}
    Bolt normalizer program:::
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    public class WordNormalizer implements IRichBolt {
    private OutputCollector collector;
    Map<String, Object> count;
    public void cleanup() {}
    * The bolt will receive the line from the
    * words file and process it to Normalize this line
    * The normalize will be put the words in lower case
    * and split the line to get all words in this
    public void execute(Tuple input) {
    String sentence = input.getString(0);
    String[] words = sentence.split(" ");
    for(String word : words){
    word = word.trim();
    word = word.toLowerCase();
    //Emit the word
    List a = new ArrayList();
    collector.emit(a,new Values(word));
    // Acknowledge the tuple
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.collector = collector;
    * The bolt will only emit the field "word"
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("word"));
    public Map<String, Object> getComponentConfiguration(){return count;}
    Bolt counter program::
    import java.util.HashMap;
    import java.util.Map;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Fields;
    public class  WordCounter implements IRichBolt {
    Integer id;
    String name;
    Map<String, Integer> counters;
    Map<String, Object> count;
    private OutputCollector collector;
    * At the end of the spout (when the cluster is shutdown
    * We will show the word counters
    public void cleanup() {
    System.out.println("-- Word Counter ["+name+"-"+id+"] --");
    for(Map.Entry<String, Integer> entry : counters.entrySet()){
    System.out.println(entry.getKey()+": "+entry.getValue());
    * On each word We will count
    public void execute(Tuple input) {
    String str = input.getString(0);
    * If the word dosn't exist in the map we will create
    * this, if not We will add 1
    counters.put(str, 1);
    Integer c = counters.get(str) + 1;
    counters.put(str, c);
    //Set the tuple as Acknowledge
    * On create
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.counters = new HashMap<String, Integer>();
    this.collector = collector; = context.getThisComponentId(); = context.getThisTaskId();
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("str", "c"));}
    public void deactivate(){}
    public Map<String, Object> getComponentConfiguration(){return count;}

Please guide us to solve the problem.
