You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Michail Toutoudakis <mi...@gmail.com> on 2015/06/07 11:22:36 UTC

What is the best way of implementing a file reader spout in local mode

I am trying to read some data from text file and process them. I am currently using scanner. In the beginning everything works fine for the first 10000 values and then it looks like no other input lines are sent to the bolt that implements the algorithm. Finally after a few minutes of run i get java.lang.OutOfMemoryError: unable to create new native thread

The file reader spout implementation is:

package tuc.LSH.storm.spouts;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;

import javax.rmi.CORBA.Util;
import java.io.*;
import java.util.Map;
import java.util.Scanner;

/**
 * Created by mixtou on 15/5/15.
 */
public class FileReaderSpout extends BaseRichSpout {
//public class FileReaderSpout implements IRichSpout {

    private SpoutOutputCollector collector;
    private Scanner scanner;
    private boolean completed;
    private TopologyContext context;
    private int spout_idx;
    private int spout_id;
    private Map config;
    private int noOfFailedWords;
    private int noOfAckedWords;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("data", new Fields("streamId", "timestamp", "value"));


    }

    @Override
    public void open(Map config, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.context = topologyContext;
        this.spout_idx = context.getThisTaskIndex();
        this.spout_id = context.getThisTaskId();
        this.collector = spoutOutputCollector;
        this.config = config;
        this.completed = false;
        this.noOfFailedWords = 0;
        this.noOfAckedWords = 0;

        try {
            this.scanner = new Scanner(new File(config.get(file_to_read()).toString()));
            System.err.println("Scanner Reading File: " + config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void nextTuple() {

        if(!completed) {
            if (scanner.hasNextLine()) {
                String[] temp = scanner.nextLine().split(",");
//            System.err.println("============== " + temp[0] + " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
                collector.emit("data", new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee delivery
                Utils.sleep(1);
            } else {
                System.err.println("End of File Closing Reader");
                scanner.close();
                completed = true;
            }
        }

    }

    private String file_to_read() {
//        this.spout_id = context.getThisTaskId();
        if (Consts.NO_OF_SPOUTS > 1) {
            int file_no = spout_idx % Consts.NO_OF_SPOUTS;
            return "data" + file_no;
        } else {
            return "data";
        }
    }

    @Override
    public void ack(Object msgId) {
        super.ack(msgId);
        noOfAckedWords++;
//        System.out.println("OK tuple acked from bolt: " + msgId + " no of acked word " + noOfAckedWords);
        System.out.println("no of acked tuples: "+noOfAckedWords);
    }

    @Override
    public void fail(Object msgId) {
        super.fail(msgId);
        noOfFailedWords++;
        System.err.println("ERROR: " + context.getThisComponentId() + " " + msgId + " no of words failed " + noOfFailedWords);

    }

}
And the bolt that gets the tuples from file reader spout and process them is:

package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import sun.jvm.hotspot.runtime.*;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;
import tuc.LSH.core.timeseries.UniversalBasicWindow;


import java.lang.Thread;
import java.util.*;

/**
 * Created by mixtou on 17/5/15.
 */
public class LSHBolt extends BaseRichBolt {
    private int task_id;
    private OutputCollector collector;
    private UniversalBasicWindow universalBasicWindow;

    private String streamId;
    private String time;
    private Float value;

    @Override
    public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        this.universalBasicWindow = new UniversalBasicWindow();
        streamId = null;
        time = null;
        value = 0f;
        System.err.println("New Bolt with id: " + task_id);

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("sync")) {
            System.out.println("Bolt task id " + task_id + " received from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
            System.out.println("Normalizing: Basic Window of Bolt " + task_id);
            universalBasicWindow.normalize(); //fill the rest of the streams with last received value to make them same size
            universalBasicWindow = null;
            universalBasicWindow = new UniversalBasicWindow();
//            Utils.sleep(1);
        }

        if (tuple.getSourceStreamId().equals("data")) {

            streamId = tuple.getStringByField("streamId");
            time = tuple.getStringByField("timestamp");
            value = Float.parseFloat(tuple.getStringByField("value"));

            universalBasicWindow.pushStream(streamId, value);

            if (universalBasicWindow.isFull(task_id)) { //check if any stream of the window is full

//                System.out.println("Univ. Basic Window of bolt " + task_id + " is Filled Up");

                collector.emit("bwFilled", new Values(task_id));
                Utils.sleep(1);

//                universalBasicWindow.normalize();
//                universalBasicWindow = new UniversalBasicWindow();

//                TODO:: Add basic window to sliding window and clear

            }

        }


//        System.err.println("SourceComponent: "+tuple.getSourceComponent());
//        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
//        System.err.println("Source Task: "+tuple.getSourceTask());
//        System.err.println("SourceGlobalStreamId: "+tuple.getSourceGlobalStreamid());
//        System.err.println("MessageId: "+tuple.getMessageId());

        collector.ack(tuple);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));

    }
}
File reader spout stops acking values by the time that universalBasicWindow is full

The Bolt that gets the stream “bwfilled” is just a bolt that sends reset back to LSHBolt so that all instances will reset basicWindow.

ResetBolt implementation is:
package tuc.LSH.storm.bolts;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.hashfunctions.HashFunctionsGen;

import javax.rmi.CORBA.Util;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * Created by mixtou on 2/6/15.
 */
public class ResetBolt extends BaseRichBolt {

    private int task_id;
    private OutputCollector collector;
    //    private int noOfFilledBoltsInstaces;
    private Set<Integer> filledBoltInstances;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {

        this.task_id = topologyContext.getThisTaskIndex();
        this.collector = outputCollector;
        this.filledBoltInstances = new HashSet<>();
        HashFunctionsGen.generateHashFunctionsForUBW();

    }

    @Override
    public void execute(Tuple tuple) {

        if (tuple.getSourceStreamId().equals("bwFilled")) {
//            System.err.println("Reset Bolt Received window filled from bolt with task id " + tuple.getInteger(0));
//            filledBoltInstances.add(tuple.getInteger(0));

            //GENERATE NEW HASH FUNCTIONS WHEN ALL BASIC WINDOW INSTANCES ARE FILLED??????
//            if (filledBoltInstances.size() == Consts.NO_OF_LSH_BOLTS) {
//                filledBoltInstances.clear();
////                System.out.println("Updating Basic Window Hash Functions");
////                HashFunctionsGen.clear();
////                HashFunctionsGen.generateHashFunctionsForUBW();
//
//            }
            collector.emit("sync", new Values("Reset"));
//            Utils.sleep(1);
//            System.err.println("Emitted ResetBW to all LSH Bolt Instances");
            collector.ack(tuple);
        }

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("sync", new Fields("message"));
    }
}
Finally the topology implementation is:

package tuc.LSH.storm.topologies;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.Bolt;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import tuc.LSH.conf.Consts;
import tuc.LSH.core.timeseries.UniversalBasicWindow;
import tuc.LSH.storm.bolts.LSHBolt;
import tuc.LSH.storm.bolts.ResetBolt;
import tuc.LSH.storm.spouts.FileReaderSpout;

import java.util.Map;

/**
 * Created by mixtou on 13/5/15.
 */
public class LSHTopology {

    public static void main(String[] args) throws Exception{

        TopologyBuilder builder = new TopologyBuilder();

//        builder.setSpout("RandomStreamSpout", new RandomStreamSpout(), Consts.NO_OF_SPOUTS);
        builder.setSpout("FileReaderSpout", new FileReaderSpout(), Consts.NO_OF_SPOUTS);
        BoltDeclarer lshBolt = builder.setBolt("LSH", new LSHBolt(), Consts.NO_OF_LSH_BOLTS)
                .fieldsGrouping("FileReaderSpout", "data", new Fields("streamId"));
        builder.setBolt("ResetBolt", new ResetBolt(), 1).shuffleGrouping("LSH", "bwFilled");

        lshBolt.allGrouping("ResetBolt", "sync");

        Config config = new Config();

        for(Map.Entry<String, String> entry : Consts.data_files.entrySet()){
            config.put(entry.getKey(), entry.getValue());
        }

        config.setDebug(false);
        config.setFallBackOnJavaSerialization(false);
//        config.registerSerialization(UniversalBasicWindow.class);



        if (args != null && args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LSHTopology", config, builder.createTopology());

            Thread.sleep(500000);
            Utils.sleep(500000);
            cluster.killTopology("LSHTopology");
            cluster.shutdown();

        }

    }

}


Re: What is the best way of implementing a file reader spout in local mode

Posted by Enno Shioji <es...@gmail.com>.
Somewhere in your code you are starting way too many threads (more than
thousands). I don't see that in your code you posted, so it must be in one
of the classes you haven't posted.

Are you using multithreading anywhere? Are you instantiating services that
spawn threads (like network clients)? If you can't tell where it is, you
can look at thread dump to see what these threads are. For this you can use
profilers, debuggers, jvisualvm or you can also kill the process with `kill
-3` and it will dump it to stdout.



On Sun, Jun 7, 2015 at 10:22 AM, Michail Toutoudakis <mi...@gmail.com>
wrote:

> I am trying to read some data from text file and process them. I am
> currently using scanner. In the beginning everything works fine for the
> first 10000 values and then it looks like no other input lines are sent to
> the bolt that implements the algorithm. Finally after a few minutes of run
> i get java.lang.OutOfMemoryError: unable to create new native thread
>
> The file reader spout implementation is:
>
> package tuc.LSH.storm.spouts;
>
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.IRichSpout;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
>
> import javax.rmi.CORBA.Util;
> import java.io.*;
> import java.util.Map;
> import java.util.Scanner;
>
> /**
>  * Created by mixtou on 15/5/15.
>  */
> public class FileReaderSpout extends BaseRichSpout {
> //public class FileReaderSpout implements IRichSpout {
>
>     private SpoutOutputCollector collector;
>     private Scanner scanner;
>     private boolean completed;
>     private TopologyContext context;
>     private int spout_idx;
>     private int spout_id;
>     private Map config;
>     private int noOfFailedWords;
>     private int noOfAckedWords;
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("data", new Fields("streamId", "timestamp", "value"));
>
>
>     }
>
>     @Override
>     public void open(Map config, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
>         this.context = topologyContext;
>         this.spout_idx = context.getThisTaskIndex();
>         this.spout_id = context.getThisTaskId();
>         this.collector = spoutOutputCollector;
>         this.config = config;
>         this.completed = false;
>         this.noOfFailedWords = 0;
>         this.noOfAckedWords = 0;
>
>         try {
>             this.scanner = new Scanner(new File(config.get(file_to_read()).toString()));
>             System.err.println("Scanner Reading File: " + config.get(file_to_read()).toString() + " Spout index: " + spout_idx);
>         } catch (FileNotFoundException e) {
>             e.printStackTrace();
>         }
>
>     }
>
>     @Override
>     public void nextTuple() {
>
>         if(!completed) {
>             if (scanner.hasNextLine()) {
>                 String[] temp = scanner.nextLine().split(",");
> //            System.err.println("============== " + temp[0] + " + " + temp[2] + " + " + temp[3]); //0-id,2-timestamp,3-value
>                 collector.emit("data", new Values(temp[0], temp[2], temp[3]), temp[0]); //emmit the correct data to next bolt without guarantee delivery
>                 Utils.sleep(1);
>             } else {
>                 System.err.println("End of File Closing Reader");
>                 scanner.close();
>                 completed = true;
>             }
>         }
>
>     }
>
>     private String file_to_read() {
> //        this.spout_id = context.getThisTaskId();
>         if (Consts.NO_OF_SPOUTS > 1) {
>             int file_no = spout_idx % Consts.NO_OF_SPOUTS;
>             return "data" + file_no;
>         } else {
>             return "data";
>         }
>     }
>
>     @Override
>     public void ack(Object msgId) {
>         super.ack(msgId);
>         noOfAckedWords++;
> //        System.out.println("OK tuple acked from bolt: " + msgId + " no of acked word " + noOfAckedWords);
>         System.out.println("no of acked tuples: "+noOfAckedWords);
>     }
>
>     @Override
>     public void fail(Object msgId) {
>         super.fail(msgId);
>         noOfFailedWords++;
>         System.err.println("ERROR: " + context.getThisComponentId() + " " + msgId + " no of words failed " + noOfFailedWords);
>
>     }
>
> }
>
> And the bolt that gets the tuples from file reader spout and process them
> is:
>
> package tuc.LSH.storm.bolts;
>
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import sun.jvm.hotspot.runtime.*;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
> import tuc.LSH.core.timeseries.UniversalBasicWindow;
>
>
> import java.lang.Thread;
> import java.util.*;
>
> /**
>  * Created by mixtou on 17/5/15.
>  */
> public class LSHBolt extends BaseRichBolt {
>     private int task_id;
>     private OutputCollector collector;
>     private UniversalBasicWindow universalBasicWindow;
>
>     private String streamId;
>     private String time;
>     private Float value;
>
>     @Override
>     public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {
>         this.task_id = topologyContext.getThisTaskIndex();
>         this.collector = outputCollector;
>         this.universalBasicWindow = new UniversalBasicWindow();
>         streamId = null;
>         time = null;
>         value = 0f;
>         System.err.println("New Bolt with id: " + task_id);
>
>     }
>
>     @Override
>     public void execute(Tuple tuple) {
>
>         if (tuple.getSourceStreamId().equals("sync")) {
>             System.out.println("Bolt task id " + task_id + " received from " + tuple.getSourceComponent() + " message " + tuple.getString(0));
>             System.out.println("Normalizing: Basic Window of Bolt " + task_id);
>             universalBasicWindow.normalize(); //fill the rest of the streams with last received value to make them same size
>             universalBasicWindow = null;
>             universalBasicWindow = new UniversalBasicWindow();
> //            Utils.sleep(1);
>         }
>
>         if (tuple.getSourceStreamId().equals("data")) {
>
>             streamId = tuple.getStringByField("streamId");
>             time = tuple.getStringByField("timestamp");
>             value = Float.parseFloat(tuple.getStringByField("value"));
>
>             universalBasicWindow.pushStream(streamId, value);
>
>             if (universalBasicWindow.isFull(task_id)) { //check if any stream of the window is full
>
> //                System.out.println("Univ. Basic Window of bolt " + task_id + " is Filled Up");
>
>                 collector.emit("bwFilled", new Values(task_id));
>                 Utils.sleep(1);
>
> //                universalBasicWindow.normalize();
> //                universalBasicWindow = new UniversalBasicWindow();
>
> //                TODO:: Add basic window to sliding window and clear
>
>             }
>
>         }
>
>
> //        System.err.println("SourceComponent: "+tuple.getSourceComponent());
> //        System.err.println("SourceStreamId: "+tuple.getSourceStreamId());
> //        System.err.println("Source Task: "+tuple.getSourceTask());
> //        System.err.println("SourceGlobalStreamId: "+tuple.getSourceGlobalStreamid());
> //        System.err.println("MessageId: "+tuple.getMessageId());
>
>         collector.ack(tuple);
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("bwFilled", new Fields("task_id"));
>
>     }
> }
>
> File reader spout stops acking values by the time that
> universalBasicWindow is full
>
> The Bolt that gets the stream “bwfilled” is just a bolt that sends reset
> back to LSHBolt so that all instances will reset basicWindow.
>
> ResetBolt implementation is:
>
> package tuc.LSH.storm.bolts;
>
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Tuple;
> import backtype.storm.tuple.Values;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
> import tuc.LSH.core.hashfunctions.HashFunctionsGen;
>
> import javax.rmi.CORBA.Util;
> import java.util.HashSet;
> import java.util.Map;
> import java.util.Set;
>
> /**
>  * Created by mixtou on 2/6/15.
>  */
> public class ResetBolt extends BaseRichBolt {
>
>     private int task_id;
>     private OutputCollector collector;
>     //    private int noOfFilledBoltsInstaces;
>     private Set<Integer> filledBoltInstances;
>
>     @Override
>     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
>
>         this.task_id = topologyContext.getThisTaskIndex();
>         this.collector = outputCollector;
>         this.filledBoltInstances = new HashSet<>();
>         HashFunctionsGen.generateHashFunctionsForUBW();
>
>     }
>
>     @Override
>     public void execute(Tuple tuple) {
>
>         if (tuple.getSourceStreamId().equals("bwFilled")) {
> //            System.err.println("Reset Bolt Received window filled from bolt with task id " + tuple.getInteger(0));
> //            filledBoltInstances.add(tuple.getInteger(0));
>
>             //GENERATE NEW HASH FUNCTIONS WHEN ALL BASIC WINDOW INSTANCES ARE FILLED??????
> //            if (filledBoltInstances.size() == Consts.NO_OF_LSH_BOLTS) {
> //                filledBoltInstances.clear();
> ////                System.out.println("Updating Basic Window Hash Functions");
> ////                HashFunctionsGen.clear();
> ////                HashFunctionsGen.generateHashFunctionsForUBW();
> //
> //            }
>             collector.emit("sync", new Values("Reset"));
> //            Utils.sleep(1);
> //            System.err.println("Emitted ResetBW to all LSH Bolt Instances");
>             collector.ack(tuple);
>         }
>
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
>         outputFieldsDeclarer.declareStream("sync", new Fields("message"));
>     }
> }
>
> Finally the topology implementation is:
>
> package tuc.LSH.storm.topologies;
>
> import backtype.storm.Config;
> import backtype.storm.LocalCluster;
> import backtype.storm.StormSubmitter;
> import backtype.storm.generated.AlreadyAliveException;
> import backtype.storm.generated.Bolt;
> import backtype.storm.generated.InvalidTopologyException;
> import backtype.storm.topology.BoltDeclarer;
> import backtype.storm.topology.TopologyBuilder;
> import backtype.storm.tuple.Fields;
> import backtype.storm.utils.Utils;
> import tuc.LSH.conf.Consts;
> import tuc.LSH.core.timeseries.UniversalBasicWindow;
> import tuc.LSH.storm.bolts.LSHBolt;
> import tuc.LSH.storm.bolts.ResetBolt;
> import tuc.LSH.storm.spouts.FileReaderSpout;
>
> import java.util.Map;
>
> /**
>  * Created by mixtou on 13/5/15.
>  */
> public class LSHTopology {
>
>     public static void main(String[] args) throws Exception{
>
>         TopologyBuilder builder = new TopologyBuilder();
>
> //        builder.setSpout("RandomStreamSpout", new RandomStreamSpout(), Consts.NO_OF_SPOUTS);
>         builder.setSpout("FileReaderSpout", new FileReaderSpout(), Consts.NO_OF_SPOUTS);
>         BoltDeclarer lshBolt = builder.setBolt("LSH", new LSHBolt(), Consts.NO_OF_LSH_BOLTS)
>                 .fieldsGrouping("FileReaderSpout", "data", new Fields("streamId"));
>         builder.setBolt("ResetBolt", new ResetBolt(), 1).shuffleGrouping("LSH", "bwFilled");
>
>         lshBolt.allGrouping("ResetBolt", "sync");
>
>         Config config = new Config();
>
>         for(Map.Entry<String, String> entry : Consts.data_files.entrySet()){
>             config.put(entry.getKey(), entry.getValue());
>         }
>
>         config.setDebug(false);
>         config.setFallBackOnJavaSerialization(false);
> //        config.registerSerialization(UniversalBasicWindow.class);
>
>
>
>         if (args != null && args.length > 0) {
>             try {
>                 StormSubmitter.submitTopology(args[0], config, builder.createTopology());
>             } catch (AlreadyAliveException e) {
>                 e.printStackTrace();
>             } catch (InvalidTopologyException e) {
>                 e.printStackTrace();
>             }
>         } else {
>             LocalCluster cluster = new LocalCluster();
>             cluster.submitTopology("LSHTopology", config, builder.createTopology());
>
>             Thread.sleep(500000);
>             Utils.sleep(500000);
>             cluster.killTopology("LSHTopology");
>             cluster.shutdown();
>
>         }
>
>     }
>
> }
>
>
>